use super::helpers::*;
use std::collections::HashMap;
use std::path::Path;
struct DriftPrediction {
resource: String,
machine: String,
drift_count: usize,
total_events: usize,
drift_rate: f64,
mean_time_between_drifts: f64,
trend: &'static str,
risk_score: f64,
}
struct DriftPredictReport {
predictions: Vec<DriftPrediction>,
total_analyzed: usize,
high_risk_count: usize,
}
struct EventRecord {
resource: String,
machine: String,
action: String,
timestamp: f64,
}
fn parse_events(state_dir: &Path) -> Vec<EventRecord> {
let mut events = Vec::new();
collect_events_recursive(state_dir, &mut events);
events.sort_by(|a, b| {
a.timestamp
.partial_cmp(&b.timestamp)
.unwrap_or(std::cmp::Ordering::Equal)
});
events
}
fn collect_events_recursive(dir: &Path, events: &mut Vec<EventRecord>) {
let entries = match std::fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return,
};
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
collect_events_recursive(&path, events);
} else if path.extension().and_then(|e| e.to_str()) == Some("jsonl") {
parse_events_file(&path, events);
}
}
}
fn parse_events_file(path: &Path, events: &mut Vec<EventRecord>) {
let content = std::fs::read_to_string(path).unwrap_or_default();
let machine = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("")
.strip_suffix(".events")
.unwrap_or("default")
.to_string();
for line in content.lines() {
let parsed: serde_json::Value =
serde_json::from_str(line).unwrap_or(serde_json::Value::Null);
let resource = parsed["resource"].as_str().unwrap_or("").to_string();
if resource.is_empty() {
continue;
}
let action = parsed["action"]
.as_str()
.or_else(|| parsed["event"].as_str())
.unwrap_or("apply")
.to_string();
let ts = parsed["timestamp"]
.as_f64()
.or_else(|| parsed["ts"].as_f64())
.or_else(|| parse_iso_timestamp(parsed["timestamp"].as_str()))
.or_else(|| parse_iso_timestamp(parsed["ts"].as_str()))
.unwrap_or(0.0);
let m = parsed["machine"]
.as_str()
.map(|s| s.to_string())
.unwrap_or_else(|| machine.clone());
events.push(EventRecord {
resource,
machine: m,
action,
timestamp: ts,
});
}
}
fn parse_iso_timestamp(s: Option<&str>) -> Option<f64> {
let s = s?;
let s = s.trim_end_matches('Z');
let s = if let Some(idx) = s.rfind('+') {
&s[..idx]
} else if let Some(idx) = s.rfind('-') {
if idx > 19 {
&s[..idx]
} else {
s
}
} else {
s
};
let parts: Vec<&str> = s.split(|c: char| !c.is_ascii_digit()).collect();
if parts.len() < 6 {
return None;
}
let y: i64 = parts[0].parse().ok()?;
let mo: i64 = parts[1].parse().ok()?;
let d: i64 = parts[2].parse().ok()?;
let h: i64 = parts[3].parse().ok()?;
let mi: i64 = parts[4].parse().ok()?;
let se: i64 = parts[5].parse().ok()?;
let days = (y - 1970) * 365 + (y - 1969) / 4 + month_days(mo) + d - 1;
Some((days * 86400 + h * 3600 + mi * 60 + se) as f64)
}
fn month_days(month: i64) -> i64 {
const CUMULATIVE: [i64; 12] = [0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334];
if (1..=12).contains(&month) {
CUMULATIVE[(month - 1) as usize]
} else {
0
}
}
fn compute_predictions(events: &[EventRecord], machine_filter: Option<&str>) -> DriftPredictReport {
let mut groups: HashMap<(String, String), Vec<&EventRecord>> = HashMap::new();
for ev in events {
if let Some(m) = machine_filter {
if ev.machine != m {
continue;
}
}
groups
.entry((ev.resource.clone(), ev.machine.clone()))
.or_default()
.push(ev);
}
let mut predictions = Vec::new();
for ((resource, machine), evts) in &groups {
let total_events = evts.len();
let drift_count = evts.iter().filter(|e| is_drift_event(&e.action)).count();
let drift_rate = if total_events > 0 {
drift_count as f64 / total_events as f64
} else {
0.0
};
let drift_timestamps: Vec<f64> = evts
.iter()
.filter(|e| is_drift_event(&e.action))
.map(|e| e.timestamp)
.collect();
let mean_time_between = compute_mtbd(&drift_timestamps);
let trend = compute_trend(&drift_timestamps);
let risk_score = compute_risk(drift_rate, drift_count, trend);
predictions.push(DriftPrediction {
resource: resource.clone(),
machine: machine.clone(),
drift_count,
total_events,
drift_rate,
mean_time_between_drifts: mean_time_between,
trend,
risk_score,
});
}
predictions.sort_by(|a, b| {
b.risk_score
.partial_cmp(&a.risk_score)
.unwrap_or(std::cmp::Ordering::Equal)
});
let total_analyzed = groups.len();
let high_risk_count = predictions.iter().filter(|p| p.risk_score > 0.7).count();
DriftPredictReport {
predictions,
total_analyzed,
high_risk_count,
}
}
fn is_drift_event(action: &str) -> bool {
matches!(
action,
"drift" | "drift_detected" | "resource_drifted" | "remediate" | "changed"
)
}
fn compute_mtbd(timestamps: &[f64]) -> f64 {
if timestamps.len() < 2 {
return 0.0;
}
let mut intervals: Vec<f64> = Vec::new();
for w in timestamps.windows(2) {
intervals.push(w[1] - w[0]);
}
intervals.iter().sum::<f64>() / intervals.len() as f64
}
fn compute_trend(timestamps: &[f64]) -> &'static str {
if timestamps.len() < 3 {
return "stable";
}
let mid = timestamps.len() / 2;
let first_half = ×tamps[..mid];
let second_half = ×tamps[mid..];
let avg_first = if first_half.is_empty() {
0.0
} else {
first_half.len() as f64
};
let avg_second = if second_half.is_empty() {
0.0
} else {
second_half.len() as f64
};
if avg_second > avg_first * 1.3 {
"increasing"
} else if avg_second < avg_first * 0.7 {
"decreasing"
} else {
"stable"
}
}
fn compute_risk(drift_rate: f64, drift_count: usize, trend: &str) -> f64 {
let base = drift_rate * 0.5 + (drift_count as f64 * 0.05).min(0.3);
let trend_multiplier = match trend {
"increasing" => 1.3,
"decreasing" => 0.7,
_ => 1.0,
};
(base * trend_multiplier).min(1.0)
}
pub(crate) fn cmd_drift_predict(
state_dir: &Path,
machine: Option<&str>,
limit: usize,
json: bool,
) -> Result<(), String> {
let events = parse_events(state_dir);
let mut report = compute_predictions(&events, machine);
if limit > 0 {
report.predictions.truncate(limit);
}
if json {
print_drift_json(&report);
} else {
print_drift_text(&report);
}
Ok(())
}
fn print_drift_json(r: &DriftPredictReport) {
let items: Vec<String> = r
.predictions
.iter()
.map(|p| {
format!(
r#"{{"resource":"{}","machine":"{}","drift_count":{},"total_events":{},"drift_rate":{:.3},"mtbd":{:.1},"trend":"{}","risk":{:.3}}}"#,
p.resource, p.machine, p.drift_count, p.total_events,
p.drift_rate, p.mean_time_between_drifts, p.trend, p.risk_score
)
})
.collect();
println!(
r#"{{"total_analyzed":{},"high_risk":{},"predictions":[{}]}}"#,
r.total_analyzed,
r.high_risk_count,
items.join(","),
);
}
fn print_drift_text(r: &DriftPredictReport) {
println!("{}\n", bold("Drift Prediction Report"));
println!(" Analyzed: {} resource(s)", r.total_analyzed);
println!(" High risk: {}\n", r.high_risk_count);
for p in &r.predictions {
let risk_str = format!("{:.1}%", p.risk_score * 100.0);
let risk_colored = if p.risk_score > 0.7 {
red(&risk_str)
} else if p.risk_score > 0.3 {
yellow(&risk_str)
} else {
green(&risk_str)
};
println!(
" {} on {} — risk {} | drifts {}/{} | trend {} | mtbd {:.0}s",
p.resource,
p.machine,
risk_colored,
p.drift_count,
p.total_events,
p.trend,
p.mean_time_between_drifts,
);
}
if r.predictions.is_empty() {
println!(
" {} No drift events found in state directory",
dim("Note:")
);
}
}