use crate::check::discrepancy::{
Discrepancy, DiscrepancyKind, Severity, TemporalGap, TemporalPatterns,
};
use crate::check::preprocess::ProofCheckFile;
use crate::check::synchrony::parse_timestamp;
use geo::{Distance, Haversine, Point};
use polars::prelude::*;
use std::collections::HashMap;
const Z_SCORE_THRESHOLD: f64 = 2.0;
const BURST_THRESHOLD_SECONDS: f64 = 1.0;
const IP_CHANGE_WINDOW_SECONDS: f64 = 60.0;
const IMPOSSIBLE_SPEED_KMH: f64 = 1000.0;
pub fn detect_location_outliers(
proof_files: &[ProofCheckFile],
) -> HashMap<String, Vec<Discrepancy>> {
let mut result: HashMap<String, Vec<Discrepancy>> = HashMap::new();
let mut names = Vec::new();
let mut lats: Vec<Option<f64>> = Vec::new();
let mut lons: Vec<Option<f64>> = Vec::new();
let mut alts: Vec<Option<f64>> = Vec::new();
let mut speeds: Vec<Option<f64>> = Vec::new();
for file in proof_files {
if let Some(json) = &file.json {
names.push(file.name.clone());
lats.push(json.location.latitude);
lons.push(json.location.longitude);
alts.push(
json.location
.altitude
.as_deref()
.and_then(|s| s.parse::<f64>().ok()),
);
speeds.push(
json.location
.speed
.as_deref()
.and_then(|s| s.parse::<f64>().ok()),
);
}
}
if names.len() < 3 {
return result;
}
let df = match DataFrame::new(vec![
Series::new("lat".into(), &lats).into(),
Series::new("lon".into(), &lons).into(),
Series::new("alt".into(), &alts).into(),
Series::new("speed".into(), &speeds).into(),
]) {
Ok(df) => df,
Err(_) => return result,
};
let checks = [
("lat", "location.latitude"),
("lon", "location.longitude"),
("alt", "location.altitude"),
("speed", "location.speed"),
];
for (col_name, field_name) in checks {
flag_outliers_in_column(&df, &names, col_name, field_name, &mut result);
}
result
}
fn flag_outliers_in_column(
df: &DataFrame,
names: &[String],
col_name: &str,
field_name: &str,
result: &mut HashMap<String, Vec<Discrepancy>>,
) {
let col = match df.column(col_name) {
Ok(c) => c,
Err(_) => return,
};
let ca = match col.as_materialized_series().f64() {
Ok(ca) => ca,
Err(_) => return,
};
let mean = match ca.mean() {
Some(m) => m,
None => return,
};
let std = match ca.std(1) {
Some(s) if s > 0.0 => s,
_ => return,
};
for (i, opt_val) in ca.into_iter().enumerate() {
if let Some(val) = opt_val {
let z = (val - mean) / std;
if z.abs() > Z_SCORE_THRESHOLD {
result.entry(names[i].clone()).or_default().push(Discrepancy {
field: field_name.to_string(),
severity: Severity::Warning,
kind: DiscrepancyKind::Outlier,
message: format!(
"{} value {:.6} is a statistical outlier (z-score: {:.2}, mean: {:.6}, std: {:.6})",
field_name, val, z, mean, std
),
});
}
}
}
}
pub fn analyze_temporal_patterns(proof_files: &[ProofCheckFile]) -> Option<TemporalPatterns> {
let mut entries: Vec<(String, f64)> = Vec::new();
for file in proof_files {
if let Some(json) = &file.json {
let time_str = json
.location
.time
.as_deref()
.or(json.file.created_at.as_deref());
if let Some(ts) = time_str {
if let Some(dt) = parse_timestamp(ts) {
let ms = dt.and_utc().timestamp_millis() as f64;
entries.push((file.name.clone(), ms));
}
}
}
}
if entries.len() < 2 {
return None;
}
entries.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
let mut intervals: Vec<f64> = Vec::new();
let mut interval_files: Vec<(String, String)> = Vec::new();
for i in 1..entries.len() {
let interval_s = (entries[i].1 - entries[i - 1].1) / 1000.0;
intervals.push(interval_s);
interval_files.push((entries[i - 1].0.clone(), entries[i].0.clone()));
}
let interval_series = Series::new("interval".into(), &intervals);
let ca = interval_series.f64().ok()?;
let mean = ca.mean()?;
let std = ca.std(1).unwrap_or(0.0);
let min_val = ca.min()?;
let max_val = ca.max()?;
let gap_threshold = if std > 0.0 {
mean + Z_SCORE_THRESHOLD * std
} else {
f64::MAX
};
let mut burst_count = 0;
let mut gap_count = 0;
let mut anomalous_intervals = Vec::new();
for (i, &interval) in intervals.iter().enumerate() {
let z_score = if std > 0.0 {
(interval - mean) / std
} else {
0.0
};
if interval < BURST_THRESHOLD_SECONDS {
burst_count += 1;
anomalous_intervals.push(TemporalGap {
file_before: interval_files[i].0.clone(),
file_after: interval_files[i].1.clone(),
interval_seconds: interval,
z_score,
});
} else if interval > gap_threshold {
gap_count += 1;
anomalous_intervals.push(TemporalGap {
file_before: interval_files[i].0.clone(),
file_after: interval_files[i].1.clone(),
interval_seconds: interval,
z_score,
});
}
}
Some(TemporalPatterns {
mean_interval_seconds: mean,
std_interval_seconds: std,
min_interval_seconds: min_val,
max_interval_seconds: max_val,
burst_count,
gap_count,
anomalous_intervals,
})
}
struct NetworkEntry {
name: String,
timestamp_ms: f64,
lat: Option<f64>,
lon: Option<f64>,
ipv4: Option<String>,
ipv6: Option<String>,
}
pub fn detect_network_anomalies(
proof_files: &[ProofCheckFile],
) -> HashMap<String, Vec<Discrepancy>> {
let mut result: HashMap<String, Vec<Discrepancy>> = HashMap::new();
let mut device_entries: HashMap<String, Vec<NetworkEntry>> = HashMap::new();
for file in proof_files {
if let Some(json) = &file.json {
let device_id = json
.device
.id
.clone()
.unwrap_or_else(|| "unknown".to_string());
let time_str = json
.location
.time
.as_deref()
.or(json.file.created_at.as_deref());
let timestamp_ms = time_str
.and_then(parse_timestamp)
.map(|dt| dt.and_utc().timestamp_millis() as f64)
.unwrap_or(0.0);
device_entries
.entry(device_id)
.or_default()
.push(NetworkEntry {
name: file.name.clone(),
timestamp_ms,
lat: json.location.latitude,
lon: json.location.longitude,
ipv4: json.network.ipv4.clone(),
ipv6: json.network.ipv6.clone(),
});
}
}
for (device_id, mut entries) in device_entries {
if entries.len() < 2 {
continue;
}
entries.sort_by(|a, b| {
a.timestamp_ms
.partial_cmp(&b.timestamp_ms)
.unwrap_or(std::cmp::Ordering::Equal)
});
let timestamps: Vec<f64> = entries.iter().map(|e| e.timestamp_ms).collect();
let ipv4s: Vec<Option<&str>> = entries.iter().map(|e| e.ipv4.as_deref()).collect();
if let Ok(df) = DataFrame::new(vec![
Series::new("ts".into(), ×tamps).into(),
Series::new("ipv4".into(), &ipv4s).into(),
]) {
if let Ok(ip_col) = df.column("ipv4") {
let n_unique = ip_col.n_unique().unwrap_or(0);
if n_unique > 1 {
check_consecutive_network_changes(&entries, &device_id, &mut result);
}
}
}
check_implied_speed(&entries, &device_id, &mut result);
}
result
}
fn check_consecutive_network_changes(
entries: &[NetworkEntry],
device_id: &str,
result: &mut HashMap<String, Vec<Discrepancy>>,
) {
for i in 1..entries.len() {
let prev = &entries[i - 1];
let curr = &entries[i];
let time_delta_s = (curr.timestamp_ms - prev.timestamp_ms) / 1000.0;
if time_delta_s > 0.0 && time_delta_s < IP_CHANGE_WINDOW_SECONDS {
let ipv4_changed = prev.ipv4 != curr.ipv4 && prev.ipv4.is_some() && curr.ipv4.is_some();
let ipv6_changed = prev.ipv6 != curr.ipv6 && prev.ipv6.is_some() && curr.ipv6.is_some();
if ipv4_changed {
result
.entry(device_id.to_string())
.or_default()
.push(Discrepancy {
field: "network.ipv4".to_string(),
severity: Severity::Warning,
kind: DiscrepancyKind::Mismatch,
message: format!(
"IPv4 changed from {} to {} within {:.0}s between {} and {}",
prev.ipv4.as_deref().unwrap_or("none"),
curr.ipv4.as_deref().unwrap_or("none"),
time_delta_s,
prev.name,
curr.name,
),
});
}
if ipv6_changed {
result
.entry(device_id.to_string())
.or_default()
.push(Discrepancy {
field: "network.ipv6".to_string(),
severity: Severity::Warning,
kind: DiscrepancyKind::Mismatch,
message: format!(
"IPv6 changed from {} to {} within {:.0}s between {} and {}",
prev.ipv6.as_deref().unwrap_or("none"),
curr.ipv6.as_deref().unwrap_or("none"),
time_delta_s,
prev.name,
curr.name,
),
});
}
}
}
}
fn check_implied_speed(
entries: &[NetworkEntry],
device_id: &str,
result: &mut HashMap<String, Vec<Discrepancy>>,
) {
for i in 1..entries.len() {
let prev = &entries[i - 1];
let curr = &entries[i];
let time_delta_s = (curr.timestamp_ms - prev.timestamp_ms) / 1000.0;
if time_delta_s <= 0.0 {
continue;
}
if let (Some(lat1), Some(lon1), Some(lat2), Some(lon2)) =
(prev.lat, prev.lon, curr.lat, curr.lon)
{
let p1 = Point::new(lon1, lat1);
let p2 = Point::new(lon2, lat2);
let distance_m = Haversine.distance(p1, p2);
let distance_km = distance_m / 1000.0;
let speed_kmh = distance_km / (time_delta_s / 3600.0);
if speed_kmh > IMPOSSIBLE_SPEED_KMH {
result
.entry(device_id.to_string())
.or_default()
.push(Discrepancy {
field: "location".to_string(),
severity: Severity::Error,
kind: DiscrepancyKind::SpatialAnomaly,
message: format!(
"Implied speed {:.0} km/h between {} and {} ({:.1} km in {:.0}s)",
speed_kmh, prev.name, curr.name, distance_km, time_delta_s,
),
});
}
}
}
}