#![allow(non_snake_case)]
use std::collections::{HashMap, HashSet};
use aok::{Result, OK};
use futures::{stream::FuturesUnordered, StreamExt};
use hook::hook;
use mysql_macro::q;
use paste::paste;
use xstr::Join;
mod ok;
use ok::ok;
mod curl;
use curl::curl;
mod db;
use db::{Kind, Watch};
mod watch;
use watch::watch;
mod should_send;
use should_send::should_send;
mod err;
use err::errlog;
mod err_duration;
mod hook;
use err_duration::err_duration;
pub async fn id_v(table: &str, id_set: HashSet<u64>) -> Result<HashMap<u64, String>> {
if id_set.is_empty() {
return Ok(Default::default());
}
let li: Vec<(u64, String)> = q!(format!(
"SELECT id,v FROM {table} WHERE id IN ({})",
id_set.join(",")
));
Ok(HashMap::from_iter(li.into_iter()))
}
pub async fn next() -> Result<()> {
let now = sts::sec();
let li: Vec<Watch> = q!(
"SELECT id,host_id,kind_id,dns_type,err,arg_id FROM watch WHERE ts<=?",
now
);
if li.is_empty() {
return OK;
}
let mut kind_set = HashSet::new();
let mut host_set = HashSet::new();
let mut arg_set = HashSet::new();
li.iter().for_each(|w| {
kind_set.insert(w.kind_id);
host_set.insert(w.host_id);
if w.arg_id != 0 {
arg_set.insert(w.arg_id);
}
});
let kind_li: Vec<Kind> = q!(format!(
"SELECT id,arg_id,duration,warnErr,v FROM kind WHERE id IN ({})",
kind_set.join(",")
));
kind_li.iter().for_each(|k| {
if k.arg_id != 0 {
arg_set.insert(k.arg_id);
}
});
let kind_map = HashMap::<u64, Kind>::from_iter(kind_li.into_iter().map(|k| (k.id, k)));
let host_map = id_v("host", host_set).await?;
let arg_map = id_v("arg", arg_set).await?;
let mut ing_curl = FuturesUnordered::new();
let mut ing_hook = FuturesUnordered::new();
for watch in &li {
if let Some(host) = host_map.get(&watch.host_id) {
if let Some(kind) = kind_map.get(&watch.kind_id) {
tracing::info!(
"{} {} IPV{} ERR {}",
kind.v,
host,
watch.dns_type,
watch.err
);
macro_rules! arg {
($type:ident) => {
if $type.arg_id > 0 {
if let Some(s) = arg_map.get(&$type.arg_id) {
s.as_str()
} else {
paste! {
dberr!(
[< $type MissArg >]
"{} watch_id={} arg_id={} kind_id={} kind_arg_id={}",
host,
watch.id,
watch.arg_id,
watch.kind_id,
kind.arg_id
);
}
continue;
}
} else {
""
}
};
}
let kind_arg = arg!(kind);
let watch_arg = arg!(watch);
if let Some(task) = hook(kind, watch, host, kind_arg, watch_arg) {
ing_hook.push(task);
} else {
ing_curl.push(curl(kind, watch, host, kind_arg, watch_arg));
}
} else {
dberr!(WatchMissKind "{} watch_id={} kind_id={}",host, watch.id, watch.kind_id);
}
} else {
dberr!(WatchMissHost "watch_id={} host_id={}", watch.id, watch.host_id);
}
}
macro_rules! log {
($($ing:ident),*) => {
$(
while let Some(r) = $ing.next().await {
if let Err(err) = r {
let title = "出错了";
tracing::error!("{title}:\n{err}");
hi::send(title,err.to_string(),"https://atomgit.com/3ti/rust/blob/main/alive/src/lib.rs#L117").await;
}
}
)*
};
}
log!(ing_hook, ing_curl);
OK
}