use crate::error::M1ndResult;
use crate::types::*;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::path::Path;
pub const DEFAULT_MAX_OBSERVATIONS: usize = 256;
pub const MIN_OBSERVATIONS_FOR_ACCELERATION: usize = 3;
pub const MIN_OBSERVATIONS_FOR_VELOCITY: usize = 2;
pub const MIN_OBSERVATION_GAP_SECS: f64 = 1.0;
pub const MAGNITUDE_CAP: f32 = 100.0;
pub const DEFAULT_THRESHOLD: f32 = 0.1;
pub const DEFAULT_TOP_K: usize = 20;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TremorObservation {
pub timestamp: f64,
pub weight_delta: f32,
pub edge_events: u16,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
pub enum TremorDirection {
Accelerating,
Decelerating,
Stable,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
pub enum RiskLevel {
Critical,
High,
Medium,
Low,
Unknown,
}
#[derive(Clone, Debug, Serialize)]
pub struct TremorAlert {
pub node_id: String,
pub label: String,
pub magnitude: f32,
pub direction: TremorDirection,
pub mean_acceleration: f32,
pub trend_slope: f32,
pub observation_count: usize,
pub window_start: f64,
pub window_end: f64,
pub latest_velocity: f32,
pub previous_velocity: f32,
pub risk_level: RiskLevel,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum TremorWindow {
Days7,
Days30,
Days90,
All,
}
impl std::str::FromStr for TremorWindow {
type Err = std::convert::Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"7d" => Self::Days7,
"30d" => Self::Days30,
"90d" => Self::Days90,
_ => Self::All,
})
}
}
impl TremorWindow {
pub fn seconds(&self) -> Option<f64> {
match self {
Self::Days7 => Some(7.0 * 86400.0),
Self::Days30 => Some(30.0 * 86400.0),
Self::Days90 => Some(90.0 * 86400.0),
Self::All => None,
}
}
}
#[derive(Clone, Debug, Serialize)]
pub struct TremorResult {
pub tremors: Vec<TremorAlert>,
pub window: String,
pub threshold: f32,
pub total_nodes_analyzed: u32,
pub nodes_with_sufficient_data: u32,
pub elapsed_ms: f64,
}
#[derive(Clone, Debug, Default)]
pub struct TremorRegistry {
observations: HashMap<String, VecDeque<TremorObservation>>,
max_observations: usize,
}
impl TremorRegistry {
pub fn new(max_observations: usize) -> Self {
Self {
observations: HashMap::new(),
max_observations,
}
}
pub fn with_defaults() -> Self {
Self::new(DEFAULT_MAX_OBSERVATIONS)
}
pub fn record_observation(
&mut self,
external_id: &str,
weight_delta: f32,
edge_events: u16,
timestamp: f64,
) {
let queue = self
.observations
.entry(external_id.to_string())
.or_default();
while queue.len() >= self.max_observations {
queue.pop_front();
}
queue.push_back(TremorObservation {
timestamp,
weight_delta,
edge_events,
});
}
pub fn analyze(
&self,
window: TremorWindow,
threshold: f32,
top_k: usize,
node_filter: Option<&str>,
now: f64,
min_observations: usize,
) -> TremorResult {
let start = std::time::Instant::now();
let window_seconds = window.seconds();
let window_start = match window_seconds {
Some(secs) => now - secs,
None => f64::NEG_INFINITY,
};
let window_str = match window {
TremorWindow::Days7 => "7d",
TremorWindow::Days30 => "30d",
TremorWindow::Days90 => "90d",
TremorWindow::All => "all",
};
let mut total_nodes_analyzed = 0u32;
let mut nodes_with_sufficient_data = 0u32;
let mut alerts: Vec<TremorAlert> = Vec::new();
for (external_id, queue) in &self.observations {
if let Some(filter) = node_filter {
if !external_id.contains(filter) {
continue;
}
}
total_nodes_analyzed += 1;
let mut obs: Vec<&TremorObservation> = queue
.iter()
.filter(|o| o.timestamp >= window_start)
.collect();
obs.sort_by(|a, b| {
a.timestamp
.partial_cmp(&b.timestamp)
.unwrap_or(std::cmp::Ordering::Equal)
});
let mut filtered: Vec<&TremorObservation> = Vec::new();
for o in &obs {
if let Some(last) = filtered.last() {
if o.timestamp - last.timestamp < MIN_OBSERVATION_GAP_SECS {
continue;
}
}
filtered.push(o);
}
let effective_min_obs = if min_observations > 0 {
min_observations
} else {
MIN_OBSERVATIONS_FOR_ACCELERATION
};
if filtered.len() < effective_min_obs {
continue;
}
nodes_with_sufficient_data += 1;
let mut velocities: Vec<f32> = Vec::with_capacity(filtered.len() - 1);
let mut vel_times: Vec<f64> = Vec::with_capacity(filtered.len() - 1);
for i in 1..filtered.len() {
let dt = (filtered[i].timestamp - filtered[i - 1].timestamp)
.max(MIN_OBSERVATION_GAP_SECS);
let v = filtered[i].weight_delta / dt as f32;
velocities.push(v);
vel_times.push(filtered[i].timestamp);
}
if velocities.len() < 2 {
continue;
}
let mut accelerations: Vec<f32> = Vec::with_capacity(velocities.len() - 1);
let mut accel_times: Vec<f64> = Vec::with_capacity(velocities.len() - 1);
for i in 1..velocities.len() {
let dt = (vel_times[i] - vel_times[i - 1]).max(MIN_OBSERVATION_GAP_SECS);
let a = (velocities[i] - velocities[i - 1]) / dt as f32;
accelerations.push(a);
accel_times.push(vel_times[i]);
}
if accelerations.is_empty() {
continue;
}
let mean_a: f32 = accelerations.iter().sum::<f32>() / accelerations.len() as f32;
let trend_slope = if accelerations.len() >= 2 {
linear_regression_slope(&accel_times, &accelerations)
} else {
0.0
};
let total_edge_events: u32 = filtered.iter().map(|o| o.edge_events as u32).sum();
let magnitude = (mean_a.abs() * (total_edge_events as f32).sqrt()).min(MAGNITUDE_CAP);
if magnitude < threshold {
continue;
}
let direction = if mean_a > 0.001 {
TremorDirection::Accelerating
} else if mean_a < -0.001 {
TremorDirection::Decelerating
} else {
TremorDirection::Stable
};
let risk_level = if magnitude > 5.0 && trend_slope > 0.5 {
RiskLevel::Critical
} else if magnitude > 2.0 || trend_slope > 0.3 {
RiskLevel::High
} else if magnitude > 0.5 {
RiskLevel::Medium
} else {
RiskLevel::Low
};
let label = external_id
.rsplit("::")
.next()
.unwrap_or(external_id)
.to_string();
let actual_window_start = filtered.first().map(|o| o.timestamp).unwrap_or(now);
let actual_window_end = filtered.last().map(|o| o.timestamp).unwrap_or(now);
let latest_velocity = *velocities.last().unwrap_or(&0.0);
let previous_velocity = if velocities.len() >= 2 {
velocities[velocities.len() - 2]
} else {
0.0
};
alerts.push(TremorAlert {
node_id: external_id.clone(),
label,
magnitude,
direction,
mean_acceleration: mean_a,
trend_slope,
observation_count: filtered.len(),
window_start: actual_window_start,
window_end: actual_window_end,
latest_velocity,
previous_velocity,
risk_level,
});
}
alerts.sort_by(|a, b| {
b.magnitude
.partial_cmp(&a.magnitude)
.unwrap_or(std::cmp::Ordering::Equal)
});
alerts.truncate(top_k);
let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
TremorResult {
tremors: alerts,
window: window_str.to_string(),
threshold,
total_nodes_analyzed,
nodes_with_sufficient_data,
elapsed_ms,
}
}
pub fn observation_count(&self, external_id: &str) -> usize {
self.observations.get(external_id).map_or(0, |q| q.len())
}
}
#[derive(Serialize, Deserialize)]
struct TremorPersistenceFormat {
version: u32,
nodes: HashMap<String, Vec<TremorObservation>>,
}
pub fn save_tremor_state(registry: &TremorRegistry, path: &Path) -> M1ndResult<()> {
let format = TremorPersistenceFormat {
version: 1,
nodes: registry
.observations
.iter()
.map(|(k, v)| (k.clone(), v.iter().cloned().collect()))
.collect(),
};
let json = serde_json::to_string_pretty(&format).map_err(crate::error::M1ndError::Serde)?;
let temp_path = path.with_extension("tmp");
{
use std::io::Write;
let file = std::fs::File::create(&temp_path)?;
let mut writer = std::io::BufWriter::new(file);
writer.write_all(json.as_bytes())?;
writer.flush()?;
}
std::fs::rename(&temp_path, path)?;
Ok(())
}
pub fn load_tremor_state(path: &Path) -> M1ndResult<TremorRegistry> {
if !path.exists() {
return Ok(TremorRegistry::with_defaults());
}
let data = std::fs::read_to_string(path)?;
let format: TremorPersistenceFormat =
serde_json::from_str(&data).map_err(crate::error::M1ndError::Serde)?;
let mut registry = TremorRegistry::new(DEFAULT_MAX_OBSERVATIONS);
for (external_id, obs_vec) in format.nodes {
let mut queue = VecDeque::with_capacity(obs_vec.len().min(DEFAULT_MAX_OBSERVATIONS));
for obs in obs_vec {
if !obs.timestamp.is_finite() || !obs.weight_delta.is_finite() {
continue;
}
queue.push_back(obs);
if queue.len() >= DEFAULT_MAX_OBSERVATIONS {
queue.pop_front();
}
}
if !queue.is_empty() {
registry.observations.insert(external_id, queue);
}
}
Ok(registry)
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
fn make_registry() -> TremorRegistry {
TremorRegistry::with_defaults()
}
fn push_obs(reg: &mut TremorRegistry, id: &str, deltas: &[f32], base_time: f64) {
for (i, &delta) in deltas.iter().enumerate() {
reg.record_observation(id, delta, 1, base_time + i as f64 * 2.0);
}
}
#[test]
fn record_observation_increments_count() {
let mut reg = make_registry();
assert_eq!(reg.observation_count("node::a"), 0);
reg.record_observation("node::a", 0.5, 1, 1000.0);
assert_eq!(reg.observation_count("node::a"), 1);
reg.record_observation("node::a", 0.3, 2, 1002.0);
assert_eq!(reg.observation_count("node::a"), 2);
}
#[test]
fn ring_buffer_evicts_oldest_at_capacity() {
let cap = 4;
let mut reg = TremorRegistry::new(cap);
for i in 0..cap + 2 {
reg.record_observation("node::b", 0.1, 1, i as f64 * 10.0);
}
assert_eq!(reg.observation_count("node::b"), cap);
}
#[test]
fn no_tremors_for_stable_node() {
let mut reg = make_registry();
push_obs(
&mut reg,
"node::stable",
&[0.01, 0.01, 0.01, 0.01, 0.01],
1000.0,
);
let result = reg.analyze(TremorWindow::All, DEFAULT_THRESHOLD, 20, None, 2000.0, 0);
assert!(
result.tremors.is_empty(),
"Expected no tremors for stable node, got {:?}",
result.tremors
);
}
#[test]
fn acceleration_detected_for_rapidly_changing_node() {
let mut reg = make_registry();
push_obs(&mut reg, "node::hot", &[0.1, 1.0, 5.0, 20.0, 80.0], 1000.0);
let result = reg.analyze(TremorWindow::All, 0.0, 20, None, 2000.0, 0);
assert!(
!result.tremors.is_empty(),
"Expected tremor alert for accelerating node"
);
let alert = &result.tremors[0];
assert_eq!(alert.node_id, "node::hot");
assert_eq!(alert.direction, TremorDirection::Accelerating);
}
#[test]
fn deceleration_produces_decelerating_direction() {
let mut reg = make_registry();
push_obs(
&mut reg,
"node::cooling",
&[80.0, 20.0, 5.0, 1.0, 0.1],
1000.0,
);
let result = reg.analyze(TremorWindow::All, 0.0, 20, None, 2000.0, 0);
let found = result.tremors.iter().find(|a| a.node_id == "node::cooling");
assert!(found.is_some(), "Expected tremor for decelerating node");
assert_eq!(found.unwrap().direction, TremorDirection::Decelerating);
}
#[test]
fn min_observations_filters_sparse_nodes() {
let mut reg = make_registry();
reg.record_observation("node::sparse", 1.0, 1, 1000.0);
reg.record_observation("node::sparse", 10.0, 1, 1002.0);
let result = reg.analyze(TremorWindow::All, 0.0, 20, None, 2000.0, 0);
assert!(result.tremors.iter().all(|a| a.node_id != "node::sparse"));
}
#[test]
fn threshold_gates_alerts() {
let mut reg = make_registry();
push_obs(&mut reg, "node::weak", &[0.1, 0.5, 1.0, 2.0, 4.0], 1000.0);
let result_zero = reg.analyze(TremorWindow::All, 0.0, 20, None, 2000.0, 0);
let result_max = reg.analyze(TremorWindow::All, MAGNITUDE_CAP, 20, None, 2000.0, 0);
assert!(result_max.tremors.is_empty());
let _ = result_zero; }
#[test]
fn save_load_round_trip() {
let mut reg = make_registry();
push_obs(
&mut reg,
"node::persist",
&[1.0, 2.0, 3.0, 4.0, 5.0],
1000.0,
);
let count_before = reg.observation_count("node::persist");
let dir = std::env::temp_dir();
let path: PathBuf = dir.join(format!("tremor_test_{}.json", std::process::id()));
save_tremor_state(®, &path).expect("save failed");
let loaded = load_tremor_state(&path).expect("load failed");
assert_eq!(loaded.observation_count("node::persist"), count_before);
let _ = std::fs::remove_file(&path);
}
#[test]
fn window_filter_excludes_old_observations() {
let mut reg = make_registry();
let now = 1_000_000.0f64;
let old_base = now - 100.0 * 86400.0;
push_obs(
&mut reg,
"node::old",
&[1.0, 5.0, 10.0, 20.0, 40.0],
old_base,
);
let result = reg.analyze(TremorWindow::Days30, 0.0, 20, None, now, 0);
assert!(
result.tremors.iter().all(|a| a.node_id != "node::old"),
"Old observations should be excluded by 30d window"
);
}
}
fn linear_regression_slope(x: &[f64], y: &[f32]) -> f32 {
let n = x.len();
if n < 2 {
return 0.0;
}
let n_f = n as f64;
let sum_x: f64 = x.iter().sum();
let sum_y: f64 = y.iter().map(|v| *v as f64).sum();
let sum_xy: f64 = x
.iter()
.zip(y.iter())
.map(|(xi, yi)| *xi * (*yi as f64))
.sum();
let sum_x2: f64 = x.iter().map(|xi| xi * xi).sum();
let denom = n_f * sum_x2 - sum_x * sum_x;
if denom.abs() < 1e-12 {
return 0.0;
}
let slope = (n_f * sum_xy - sum_x * sum_y) / denom;
slope as f32
}