use noxu_dbi::EnvironmentImpl;
use noxu_engine::VerifyConfig;
use noxu_sync::Mutex;
use noxu_util::daemon::DaemonThread;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub struct VerifyDaemon {
daemon: DaemonThread,
}
impl VerifyDaemon {
pub fn start(
env_impl: Arc<Mutex<EnvironmentImpl>>,
schedule: CronSchedule,
config: VerifyConfig,
) -> Self {
Self::start_with_tick(env_impl, schedule, config, Duration::from_secs(60))
}
pub fn start_with_tick(
env_impl: Arc<Mutex<EnvironmentImpl>>,
schedule: CronSchedule,
config: VerifyConfig,
tick: Duration,
) -> Self {
let last_run_minute = Arc::new(AtomicU64::new(u64::MAX));
let daemon =
DaemonThread::spawn("noxu-verifier", tick, move || {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
if should_run_now(&schedule, now, &last_run_minute) {
run_once(&env_impl, &config);
}
true
});
VerifyDaemon { daemon }
}
pub fn stop(self) {
self.daemon.shutdown();
}
}
fn should_run_now(
schedule: &CronSchedule,
now_epoch_secs: u64,
last_run_minute: &AtomicU64,
) -> bool {
if !schedule.matches_epoch_secs(now_epoch_secs) {
return false;
}
let minute = now_epoch_secs / 60;
last_run_minute.swap(minute, Ordering::Relaxed) != minute
}
fn run_once(env_impl: &Arc<Mutex<EnvironmentImpl>>, config: &VerifyConfig) {
let guard = match env_impl.try_lock() {
Some(g) => g,
None => return,
};
let all_dbs = guard.get_all_database_impls();
let tracker_guard = guard.get_utilization_tracker().map(|t| t.lock());
let mut merged = noxu_engine::VerifyResult::new();
for db_arc in &all_dbs {
let db_guard = db_arc.read();
let result = noxu_engine::verify_database_impl(&db_guard, config);
merged.databases_verified += result.databases_verified;
merged.records_verified += result.records_verified;
for err in result.errors {
merged.add_error(err);
if merged.error_count() >= config.max_errors as usize {
break;
}
}
if let Some(ref t) = tracker_guard {
noxu_engine::check_lsns_against_tracker(&db_guard, t, &mut merged);
}
if merged.error_count() >= config.max_errors as usize {
break;
}
}
if merged.error_count() > 0 {
log::warn!(
"background verifier found {} error(s) across {} database(s) \
({} records verified): {:?}",
merged.error_count(),
merged.databases_verified,
merged.records_verified,
merged.errors,
);
} else {
log::debug!(
"background verifier: OK ({} database(s), {} records)",
merged.databases_verified,
merged.records_verified,
);
}
}
#[derive(Debug, Clone)]
pub struct CronSchedule {
minute: CronField,
hour: CronField,
dom: CronField,
month: CronField,
dow: CronField,
}
impl CronSchedule {
pub fn parse(s: &str) -> Option<CronSchedule> {
let parts: Vec<&str> = s.split_whitespace().collect();
if parts.len() != 5 {
return None;
}
Some(CronSchedule {
minute: CronField::parse(parts[0], 0, 59)?,
hour: CronField::parse(parts[1], 0, 23)?,
dom: CronField::parse(parts[2], 1, 31)?,
month: CronField::parse(parts[3], 1, 12)?,
dow: CronField::parse(parts[4], 0, 6)?, })
}
pub fn matches_epoch_secs(&self, epoch_secs: u64) -> bool {
let (min, hour, dom, month, dow) = utc_fields(epoch_secs);
self.minute.matches(min)
&& self.hour.matches(hour)
&& self.month.matches(month)
&& cron_day_match(&self.dom, dom, &self.dow, dow)
}
}
fn cron_day_match(dom: &CronField, d: u32, dow: &CronField, w: u32) -> bool {
match (dom.is_wildcard, dow.is_wildcard) {
(true, true) => true,
(false, true) => dom.matches(d),
(true, false) => dow.matches(w),
(false, false) => dom.matches(d) || dow.matches(w),
}
}
#[derive(Debug, Clone)]
struct CronField {
allowed: u64,
min: u32,
is_wildcard: bool,
}
impl CronField {
fn parse(field: &str, min: u32, max: u32) -> Option<CronField> {
let mut allowed: u64 = 0;
let is_wildcard = field == "*";
for part in field.split(',') {
let (range_part, step) = match part.split_once('/') {
Some((r, s)) => (r, s.parse::<u32>().ok().filter(|n| *n > 0)?),
None => (part, 1),
};
let (lo, hi) = if range_part == "*" {
(min, max)
} else if let Some((a, b)) = range_part.split_once('-') {
(a.parse::<u32>().ok()?, b.parse::<u32>().ok()?)
} else {
let v = range_part.parse::<u32>().ok()?;
(v, v)
};
if lo < min || hi > max || lo > hi {
return None;
}
let mut v = lo;
while v <= hi {
allowed |= 1u64 << (v - min);
v += step;
}
}
Some(CronField { allowed, min, is_wildcard })
}
fn matches(&self, value: u32) -> bool {
if value < self.min {
return false;
}
let bit = value - self.min;
bit < 64 && (self.allowed & (1u64 << bit)) != 0
}
}
fn utc_fields(epoch_secs: u64) -> (u32, u32, u32, u32, u32) {
let days = (epoch_secs / 86_400) as i64;
let secs_of_day = epoch_secs % 86_400;
let minute = ((secs_of_day / 60) % 60) as u32;
let hour = (secs_of_day / 3600) as u32;
let dow = (((days % 7) + 4 + 7) % 7) as u32;
let z = days + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = z - era * 146_097; let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365; let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); let mp = (5 * doy + 2) / 153; let day = (doy - (153 * mp + 2) / 5 + 1) as u32; let month = if mp < 10 { mp + 3 } else { mp - 9 } as u32;
(minute, hour, day, month, dow)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn utc_fields_known_instant() {
let (min, hour, dom, month, dow) = utc_fields(1_609_459_200);
assert_eq!((min, hour, dom, month), (0, 0, 1, 1));
assert_eq!(dow, 5, "2021-01-01 is a Friday (cron dow 5)");
}
#[test]
fn daily_midnight_matches_only_at_midnight() {
let s = CronSchedule::parse("0 0 * * *").unwrap();
assert!(s.matches_epoch_secs(1_609_459_200));
assert!(!s.matches_epoch_secs(1_609_459_200 + 60));
assert!(!s.matches_epoch_secs(1_609_459_200 + 3600));
}
#[test]
fn every_six_hours_step() {
let s = CronSchedule::parse("0 */6 * * *").unwrap();
let midnight = 1_609_459_200; assert!(s.matches_epoch_secs(midnight)); assert!(!s.matches_epoch_secs(midnight + 3600)); assert!(s.matches_epoch_secs(midnight + 6 * 3600)); assert!(s.matches_epoch_secs(midnight + 12 * 3600)); assert!(!s.matches_epoch_secs(midnight + 7 * 3600)); }
#[test]
fn exact_minute_hour() {
let s = CronSchedule::parse("30 2 * * *").unwrap();
let midnight = 1_609_459_200;
assert!(s.matches_epoch_secs(midnight + 2 * 3600 + 30 * 60)); assert!(!s.matches_epoch_secs(midnight + 2 * 3600)); assert!(!s.matches_epoch_secs(midnight + 3 * 3600 + 30 * 60)); }
#[test]
fn range_and_list_fields() {
let s = CronSchedule::parse("0,15 9-17 * * *").unwrap();
let midnight = 1_609_459_200;
assert!(s.matches_epoch_secs(midnight + 9 * 3600)); assert!(s.matches_epoch_secs(midnight + 9 * 3600 + 15 * 60)); assert!(!s.matches_epoch_secs(midnight + 9 * 3600 + 30 * 60)); assert!(!s.matches_epoch_secs(midnight + 8 * 3600)); assert!(s.matches_epoch_secs(midnight + 17 * 3600)); assert!(!s.matches_epoch_secs(midnight + 18 * 3600)); }
#[test]
fn rejects_malformed() {
assert!(CronSchedule::parse("").is_none());
assert!(CronSchedule::parse("0 0 * *").is_none()); assert!(CronSchedule::parse("60 0 * * *").is_none()); assert!(CronSchedule::parse("0 24 * * *").is_none()); assert!(CronSchedule::parse("x 0 * * *").is_none()); }
#[test]
fn dom_dow_or_semantics() {
let s = CronSchedule::parse("0 0 1 * 1").unwrap();
assert!(s.matches_epoch_secs(1_609_459_200));
let jan4 = 1_609_459_200 + 3 * 86_400;
assert!(s.matches_epoch_secs(jan4));
let jan5 = 1_609_459_200 + 4 * 86_400;
assert!(!s.matches_epoch_secs(jan5));
}
#[test]
fn should_run_now_fires_once_per_matching_minute() {
let s = CronSchedule::parse("* * * * *").unwrap();
let last = AtomicU64::new(u64::MAX);
let t = 1_609_459_200; assert!(should_run_now(&s, t, &last));
assert!(!should_run_now(&s, t + 30, &last));
assert!(should_run_now(&s, t + 60, &last));
}
#[test]
fn should_run_now_skips_non_matching_minute() {
let s = CronSchedule::parse("30 2 * * *").unwrap();
let last = AtomicU64::new(u64::MAX);
let midnight = 1_609_459_200;
assert!(!should_run_now(&s, midnight, &last));
assert!(should_run_now(&s, midnight + 2 * 3600 + 30 * 60, &last));
}
}