use crate::core::cron_source::{self, CronSchedule, CronTime};
use crate::core::metric_source::{self, MetricEvalResult, MetricThreshold, ThresholdTracker};
use crate::core::rules_runtime::{self, EvalResult};
use crate::core::types::{
CooldownTracker, EventType, InfraEvent, Rulebook, RulebookAction, RulebookConfig,
};
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct WatchDaemonConfig {
pub poll_interval_secs: u64,
pub cron_schedules: Vec<(String, String)>,
pub metric_thresholds: Vec<MetricThreshold>,
pub webhook_port: u16,
pub watch_paths: Vec<String>,
pub event_buffer_size: usize,
pub event_logging: bool,
}
impl Default for WatchDaemonConfig {
fn default() -> Self {
Self {
poll_interval_secs: 5,
cron_schedules: Vec::new(),
metric_thresholds: Vec::new(),
webhook_port: 0,
watch_paths: Vec::new(),
event_buffer_size: 1024,
event_logging: true,
}
}
}
#[derive(Debug)]
pub struct DaemonState {
pub cooldown: CooldownTracker,
pub metrics: ThresholdTracker,
pub cron_parsed: Vec<(String, CronSchedule)>,
pub events_processed: u64,
pub actions_dispatched: u64,
pub file_hashes: HashMap<String, String>,
pub shutdown: bool,
}
impl DaemonState {
pub fn new(config: &WatchDaemonConfig) -> Self {
let cron_parsed = config
.cron_schedules
.iter()
.filter_map(|(name, expr)| {
cron_source::parse_cron(expr)
.ok()
.map(|s| (name.clone(), s))
})
.collect();
Self {
cooldown: CooldownTracker::default(),
metrics: ThresholdTracker::default(),
cron_parsed,
events_processed: 0,
actions_dispatched: 0,
file_hashes: HashMap::new(),
shutdown: false,
}
}
}
#[derive(Debug, Clone)]
pub struct ProcessedEvent {
pub event: InfraEvent,
pub eval_results: Vec<EvalResult>,
pub pending_actions: Vec<(String, RulebookAction)>,
}
pub fn process_event(
event: &InfraEvent,
rulebook_config: &RulebookConfig,
state: &mut DaemonState,
) -> ProcessedEvent {
state.events_processed += 1;
let eval_results = rules_runtime::evaluate_event(event, rulebook_config, &mut state.cooldown);
let pending_actions: Vec<(String, RulebookAction)> = eval_results
.iter()
.filter(|r| !r.cooldown_blocked && !r.disabled)
.flat_map(|r| {
r.actions
.iter()
.map(|a| (r.rulebook.clone(), a.clone()))
.collect::<Vec<_>>()
})
.collect();
state.actions_dispatched += pending_actions.len() as u64;
ProcessedEvent {
event: event.clone(),
eval_results,
pending_actions,
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ActionKind {
Apply,
Destroy,
Script,
Notify,
Unknown,
}
impl std::fmt::Display for ActionKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Apply => write!(f, "apply"),
Self::Destroy => write!(f, "destroy"),
Self::Script => write!(f, "script"),
Self::Notify => write!(f, "notify"),
Self::Unknown => write!(f, "unknown"),
}
}
}
pub fn classify_action(action: &RulebookAction) -> ActionKind {
if action.apply.is_some() {
ActionKind::Apply
} else if action.destroy.is_some() {
ActionKind::Destroy
} else if action.script.is_some() {
ActionKind::Script
} else if action.notify.is_some() {
ActionKind::Notify
} else {
ActionKind::Unknown
}
}
pub fn check_cron_schedules(state: &DaemonState, time: &CronTime) -> Vec<String> {
state
.cron_parsed
.iter()
.filter(|(_, sched)| cron_source::matches(sched, time))
.map(|(name, _)| name.clone())
.collect()
}
pub fn cron_event(schedule_name: &str, timestamp: &str) -> InfraEvent {
let mut payload = HashMap::new();
payload.insert("schedule".to_string(), schedule_name.to_string());
InfraEvent {
event_type: EventType::CronFired,
timestamp: timestamp.to_string(),
machine: None,
payload,
}
}
pub fn file_changed_event(path: &str, timestamp: &str) -> InfraEvent {
let mut payload = HashMap::new();
payload.insert("path".to_string(), path.to_string());
InfraEvent {
event_type: EventType::FileChanged,
timestamp: timestamp.to_string(),
machine: None,
payload,
}
}
pub fn metric_threshold_event(result: &MetricEvalResult, timestamp: &str) -> InfraEvent {
let mut payload = HashMap::new();
payload.insert("metric".to_string(), result.name.clone());
payload.insert("current".to_string(), result.current.to_string());
payload.insert("threshold".to_string(), result.threshold.to_string());
payload.insert("operator".to_string(), result.operator.to_string());
InfraEvent {
event_type: EventType::MetricThreshold,
timestamp: timestamp.to_string(),
machine: None,
payload,
}
}
pub fn check_metrics(
thresholds: &[MetricThreshold],
values: &HashMap<String, f64>,
state: &mut DaemonState,
) -> Vec<InfraEvent> {
let results = metric_source::evaluate_metrics(thresholds, values, &mut state.metrics);
let ts = timestamp_now();
results
.iter()
.filter(|r| r.should_fire)
.map(|r| metric_threshold_event(r, &ts))
.collect()
}
pub fn detect_file_changes(
paths: &[String],
current_hashes: &HashMap<String, String>,
state: &mut DaemonState,
) -> Vec<String> {
let mut changed = Vec::new();
for path in paths {
let new_hash = current_hashes.get(path);
let old_hash = state.file_hashes.get(path);
match (old_hash, new_hash) {
(Some(old), Some(new)) if old != new => {
changed.push(path.clone());
}
(None, Some(_)) => {
}
_ => {}
}
}
for (path, hash) in current_hashes {
state.file_hashes.insert(path.clone(), hash.clone());
}
changed
}
pub fn format_event_log(event: &InfraEvent, actions_taken: &[(String, ActionKind)]) -> String {
let actions_json: Vec<serde_json::Value> = actions_taken
.iter()
.map(|(rb, kind)| {
serde_json::json!({
"rulebook": rb,
"action": kind.to_string(),
})
})
.collect();
let log_entry = serde_json::json!({
"timestamp": event.timestamp,
"event_type": event.event_type.to_string(),
"machine": event.machine,
"payload": event.payload,
"actions": actions_json,
});
serde_json::to_string(&log_entry).unwrap_or_default()
}
#[derive(Debug, Clone)]
pub struct DaemonSummary {
pub events_processed: u64,
pub actions_dispatched: u64,
pub cron_schedules: usize,
pub watched_paths: usize,
pub metric_thresholds: usize,
pub shutdown: bool,
}
pub fn daemon_summary(config: &WatchDaemonConfig, state: &DaemonState) -> DaemonSummary {
DaemonSummary {
events_processed: state.events_processed,
actions_dispatched: state.actions_dispatched,
cron_schedules: state.cron_parsed.len(),
watched_paths: config.watch_paths.len(),
metric_thresholds: config.metric_thresholds.len(),
shutdown: state.shutdown,
}
}
pub fn build_rulebook_config(rulebooks: Vec<Rulebook>) -> RulebookConfig {
RulebookConfig { rulebooks }
}
fn timestamp_now() -> String {
let dur = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();
format!("{}Z", dur.as_secs())
}