use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum EventType {
CutIncreased,
CutDecreased,
ThresholdCrossedBelow,
ThresholdCrossedAbove,
Disconnected,
Connected,
EdgeInserted,
EdgeDeleted,
}
impl EventType {
fn as_str(&self) -> &'static str {
match self {
EventType::CutIncreased => "cut_increased",
EventType::CutDecreased => "cut_decreased",
EventType::ThresholdCrossedBelow => "threshold_crossed_below",
EventType::ThresholdCrossedAbove => "threshold_crossed_above",
EventType::Disconnected => "disconnected",
EventType::Connected => "connected",
EventType::EdgeInserted => "edge_inserted",
EventType::EdgeDeleted => "edge_deleted",
}
}
}
#[derive(Debug, Clone)]
pub struct MinCutEvent {
pub event_type: EventType,
pub new_value: f64,
pub old_value: f64,
pub timestamp: Instant,
pub threshold: Option<f64>,
pub edge: Option<(u64, u64)>,
}
pub type EventCallback = Box<dyn Fn(&MinCutEvent) + Send + Sync>;
#[derive(Debug, Clone)]
pub struct Threshold {
pub value: f64,
pub name: String,
pub alert_below: bool,
pub enabled: bool,
last_state: Option<bool>,
}
impl Threshold {
pub fn new(value: f64, name: String, alert_below: bool) -> Self {
Self {
value,
name,
alert_below,
enabled: true,
last_state: None,
}
}
fn check_crossing(&mut self, old_value: f64, new_value: f64) -> Option<EventType> {
let new_state = if self.alert_below {
new_value < self.value
} else {
new_value > self.value
};
let old_state = if let Some(last) = self.last_state {
last
} else {
let initial_state = if self.alert_below {
old_value < self.value
} else {
old_value > self.value
};
self.last_state = Some(initial_state);
initial_state
};
self.last_state = Some(new_state);
if old_state != new_state && new_state {
Some(if self.alert_below {
EventType::ThresholdCrossedBelow
} else {
EventType::ThresholdCrossedAbove
})
} else {
None
}
}
}
#[derive(Debug, Clone)]
pub struct MonitorMetrics {
pub total_events: u64,
pub events_by_type: HashMap<String, u64>,
pub cut_history: Vec<(Instant, f64)>,
pub avg_cut: f64,
pub min_observed: f64,
pub max_observed: f64,
pub threshold_violations: u64,
pub time_since_last_event: Option<Duration>,
}
impl Default for MonitorMetrics {
fn default() -> Self {
Self {
total_events: 0,
events_by_type: HashMap::new(),
cut_history: Vec::new(),
avg_cut: 0.0,
min_observed: f64::INFINITY,
max_observed: f64::NEG_INFINITY,
threshold_violations: 0,
time_since_last_event: None,
}
}
}
#[derive(Debug, Clone)]
pub struct MonitorConfig {
pub max_callbacks: usize,
pub sample_interval: Duration,
pub max_history_size: usize,
pub collect_metrics: bool,
}
impl Default for MonitorConfig {
fn default() -> Self {
Self {
max_callbacks: 100,
sample_interval: Duration::from_secs(1),
max_history_size: 1000,
collect_metrics: true,
}
}
}
struct CallbackEntry {
name: String,
callback: EventCallback,
event_filter: Option<EventType>,
}
pub struct MinCutMonitor {
callbacks: RwLock<Vec<CallbackEntry>>,
thresholds: RwLock<Vec<Threshold>>,
metrics: RwLock<MonitorMetrics>,
current_cut: RwLock<f64>,
config: MonitorConfig,
start_time: Instant,
last_event: RwLock<Option<Instant>>,
last_sample: RwLock<Instant>,
}
impl MinCutMonitor {
pub fn new(config: MonitorConfig) -> Self {
let now = Instant::now();
Self {
callbacks: RwLock::new(Vec::new()),
thresholds: RwLock::new(Vec::new()),
metrics: RwLock::new(MonitorMetrics::default()),
current_cut: RwLock::new(0.0),
config,
start_time: now,
last_event: RwLock::new(None),
last_sample: RwLock::new(now),
}
}
pub fn on_event<F>(&self, name: &str, callback: F) -> crate::Result<()>
where
F: Fn(&MinCutEvent) + Send + Sync + 'static,
{
let mut callbacks = self.callbacks.write();
if callbacks.len() >= self.config.max_callbacks {
return Err(crate::MinCutError::InvalidParameter(format!(
"Maximum number of callbacks ({}) reached",
self.config.max_callbacks
)));
}
callbacks.push(CallbackEntry {
name: name.to_string(),
callback: Box::new(callback),
event_filter: None,
});
Ok(())
}
pub fn on_event_type<F>(
&self,
event_type: EventType,
name: &str,
callback: F,
) -> crate::Result<()>
where
F: Fn(&MinCutEvent) + Send + Sync + 'static,
{
let mut callbacks = self.callbacks.write();
if callbacks.len() >= self.config.max_callbacks {
return Err(crate::MinCutError::InvalidParameter(format!(
"Maximum number of callbacks ({}) reached",
self.config.max_callbacks
)));
}
callbacks.push(CallbackEntry {
name: name.to_string(),
callback: Box::new(callback),
event_filter: Some(event_type),
});
Ok(())
}
pub fn add_threshold(&self, threshold: Threshold) -> crate::Result<()> {
let mut thresholds = self.thresholds.write();
if thresholds.iter().any(|t| t.name == threshold.name) {
return Err(crate::MinCutError::InvalidParameter(format!(
"Threshold with name '{}' already exists",
threshold.name
)));
}
thresholds.push(threshold);
Ok(())
}
pub fn remove_threshold(&self, name: &str) -> bool {
let mut thresholds = self.thresholds.write();
if let Some(pos) = thresholds.iter().position(|t| t.name == name) {
thresholds.remove(pos);
true
} else {
false
}
}
pub fn remove_callback(&self, name: &str) -> bool {
let mut callbacks = self.callbacks.write();
if let Some(pos) = callbacks.iter().position(|c| c.name == name) {
callbacks.remove(pos);
true
} else {
false
}
}
pub fn notify(&self, old_value: f64, new_value: f64, edge: Option<(u64, u64)>) {
let now = Instant::now();
*self.current_cut.write() = new_value;
let base_event_type = if new_value > old_value {
EventType::CutIncreased
} else if new_value < old_value {
EventType::CutDecreased
} else {
if edge.is_some() {
EventType::EdgeInserted
} else {
return; }
};
let mut events = Vec::new();
if new_value != old_value {
events.push(MinCutEvent {
event_type: base_event_type,
new_value,
old_value,
timestamp: now,
threshold: None,
edge,
});
}
if edge.is_some() {
let edge_event = if new_value >= old_value {
EventType::EdgeInserted
} else {
EventType::EdgeDeleted
};
events.push(MinCutEvent {
event_type: edge_event,
new_value,
old_value,
timestamp: now,
threshold: None,
edge,
});
}
if old_value > 0.0 && new_value == 0.0 {
events.push(MinCutEvent {
event_type: EventType::Disconnected,
new_value,
old_value,
timestamp: now,
threshold: None,
edge,
});
} else if old_value == 0.0 && new_value > 0.0 {
events.push(MinCutEvent {
event_type: EventType::Connected,
new_value,
old_value,
timestamp: now,
threshold: None,
edge,
});
}
let threshold_events = self.check_thresholds(old_value, new_value);
for (threshold, event_type) in threshold_events {
events.push(MinCutEvent {
event_type,
new_value,
old_value,
timestamp: now,
threshold: Some(threshold.value),
edge,
});
}
for event in events {
self.fire_event(event);
}
*self.last_event.write() = Some(now);
}
pub fn metrics(&self) -> MonitorMetrics {
let mut metrics = self.metrics.read().clone();
if let Some(last) = *self.last_event.read() {
metrics.time_since_last_event = Some(Instant::now().duration_since(last));
}
metrics
}
pub fn reset_metrics(&self) {
let mut metrics = self.metrics.write();
*metrics = MonitorMetrics::default();
}
pub fn current_cut(&self) -> f64 {
*self.current_cut.read()
}
pub fn threshold_status(&self) -> Vec<(String, bool)> {
let thresholds = self.thresholds.read();
let current = *self.current_cut.read();
thresholds
.iter()
.map(|t| {
let active = if t.alert_below {
current < t.value
} else {
current > t.value
};
(t.name.clone(), active && t.enabled)
})
.collect()
}
fn fire_event(&self, event: MinCutEvent) {
if self.config.collect_metrics {
self.update_metrics(&event);
}
let callbacks = self.callbacks.read();
for entry in callbacks.iter() {
if let Some(filter) = entry.event_filter {
if filter != event.event_type {
continue;
}
}
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
(entry.callback)(&event);
}));
if result.is_err() {
eprintln!(
"Warning: Callback '{}' panicked during execution",
entry.name
);
}
}
}
fn check_thresholds(&self, old_value: f64, new_value: f64) -> Vec<(Threshold, EventType)> {
let mut thresholds = self.thresholds.write();
let mut result = Vec::new();
for threshold in thresholds.iter_mut() {
if !threshold.enabled {
continue;
}
if let Some(event_type) = threshold.check_crossing(old_value, new_value) {
result.push((threshold.clone(), event_type));
}
}
result
}
fn update_metrics(&self, event: &MinCutEvent) {
let mut metrics = self.metrics.write();
metrics.total_events += 1;
let type_str = event.event_type.as_str().to_string();
*metrics.events_by_type.entry(type_str).or_insert(0) += 1;
if event.new_value < metrics.min_observed {
metrics.min_observed = event.new_value;
}
if event.new_value > metrics.max_observed {
metrics.max_observed = event.new_value;
}
if metrics.total_events == 1 {
metrics.avg_cut = event.new_value;
} else {
let n = metrics.total_events as f64;
metrics.avg_cut = (metrics.avg_cut * (n - 1.0) + event.new_value) / n;
}
if matches!(
event.event_type,
EventType::ThresholdCrossedBelow | EventType::ThresholdCrossedAbove
) {
metrics.threshold_violations += 1;
}
let mut last_sample = self.last_sample.write();
if event.timestamp.duration_since(*last_sample) >= self.config.sample_interval {
metrics.cut_history.push((event.timestamp, event.new_value));
if metrics.cut_history.len() > self.config.max_history_size {
metrics.cut_history.remove(0);
}
*last_sample = event.timestamp;
}
}
}
pub struct MonitorBuilder {
config: MonitorConfig,
thresholds: Vec<Threshold>,
callbacks: Vec<(String, EventCallback, Option<EventType>)>,
}
impl MonitorBuilder {
pub fn new() -> Self {
Self {
config: MonitorConfig::default(),
thresholds: Vec::new(),
callbacks: Vec::new(),
}
}
pub fn with_config(mut self, config: MonitorConfig) -> Self {
self.config = config;
self
}
pub fn threshold_below(mut self, value: f64, name: &str) -> Self {
self.thresholds
.push(Threshold::new(value, name.to_string(), true));
self
}
pub fn threshold_above(mut self, value: f64, name: &str) -> Self {
self.thresholds
.push(Threshold::new(value, name.to_string(), false));
self
}
pub fn on_change<F>(mut self, name: &str, callback: F) -> Self
where
F: Fn(&MinCutEvent) + Send + Sync + 'static,
{
self.callbacks
.push((name.to_string(), Box::new(callback), None));
self
}
pub fn on_event_type<F>(mut self, event_type: EventType, name: &str, callback: F) -> Self
where
F: Fn(&MinCutEvent) + Send + Sync + 'static,
{
self.callbacks
.push((name.to_string(), Box::new(callback), Some(event_type)));
self
}
pub fn build(self) -> MinCutMonitor {
let monitor = MinCutMonitor::new(self.config);
for threshold in self.thresholds {
let _ = monitor.add_threshold(threshold);
}
for (name, callback, filter) in self.callbacks {
let mut callbacks = monitor.callbacks.write();
callbacks.push(CallbackEntry {
name,
callback,
event_filter: filter,
});
}
monitor
}
}
impl Default for MonitorBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
#[test]
fn test_event_type_str() {
assert_eq!(EventType::CutIncreased.as_str(), "cut_increased");
assert_eq!(EventType::CutDecreased.as_str(), "cut_decreased");
assert_eq!(
EventType::ThresholdCrossedBelow.as_str(),
"threshold_crossed_below"
);
}
#[test]
fn test_threshold_crossing_below() {
let mut threshold = Threshold::new(10.0, "test".to_string(), true);
let event = threshold.check_crossing(15.0, 5.0);
assert_eq!(event, Some(EventType::ThresholdCrossedBelow));
let event = threshold.check_crossing(5.0, 3.0);
assert_eq!(event, None);
let event = threshold.check_crossing(3.0, 15.0);
assert_eq!(event, None);
let event = threshold.check_crossing(15.0, 5.0);
assert_eq!(event, Some(EventType::ThresholdCrossedBelow));
}
#[test]
fn test_threshold_crossing_above() {
let mut threshold = Threshold::new(10.0, "test".to_string(), false);
let event = threshold.check_crossing(5.0, 15.0);
assert_eq!(event, Some(EventType::ThresholdCrossedAbove));
let event = threshold.check_crossing(15.0, 20.0);
assert_eq!(event, None);
let event = threshold.check_crossing(20.0, 5.0);
assert_eq!(event, None);
let event = threshold.check_crossing(5.0, 15.0);
assert_eq!(event, Some(EventType::ThresholdCrossedAbove));
}
#[test]
fn test_monitor_creation() {
let config = MonitorConfig::default();
let monitor = MinCutMonitor::new(config);
assert_eq!(monitor.current_cut(), 0.0);
assert_eq!(monitor.metrics().total_events, 0);
}
#[test]
fn test_callback_registration() {
let monitor = MinCutMonitor::new(MonitorConfig::default());
let counter = Arc::new(AtomicU64::new(0));
let counter_clone = counter.clone();
monitor
.on_event("test", move |_| {
counter_clone.fetch_add(1, Ordering::SeqCst);
})
.unwrap();
monitor.notify(0.0, 10.0, None);
std::thread::sleep(Duration::from_millis(10));
assert!(counter.load(Ordering::SeqCst) > 0);
}
#[test]
fn test_event_type_filtering() {
let monitor = MinCutMonitor::new(MonitorConfig::default());
let counter = Arc::new(AtomicU64::new(0));
let counter_clone = counter.clone();
monitor
.on_event_type(EventType::CutIncreased, "test", move |_| {
counter_clone.fetch_add(1, Ordering::SeqCst);
})
.unwrap();
monitor.notify(5.0, 10.0, None);
std::thread::sleep(Duration::from_millis(10));
let count1 = counter.load(Ordering::SeqCst);
assert!(count1 > 0);
monitor.notify(10.0, 5.0, None);
std::thread::sleep(Duration::from_millis(10));
let count2 = counter.load(Ordering::SeqCst);
assert_eq!(count1, count2);
}
#[test]
fn test_threshold_monitoring() {
let monitor = MinCutMonitor::new(MonitorConfig::default());
let threshold = Threshold::new(10.0, "critical".to_string(), true);
monitor.add_threshold(threshold).unwrap();
let counter = Arc::new(AtomicU64::new(0));
let counter_clone = counter.clone();
monitor
.on_event_type(
EventType::ThresholdCrossedBelow,
"threshold_cb",
move |_| {
counter_clone.fetch_add(1, Ordering::SeqCst);
},
)
.unwrap();
monitor.notify(15.0, 5.0, None);
std::thread::sleep(Duration::from_millis(10));
assert!(counter.load(Ordering::SeqCst) > 0);
}
#[test]
fn test_metrics_collection() {
let monitor = MinCutMonitor::new(MonitorConfig::default());
monitor.notify(0.0, 10.0, None);
monitor.notify(10.0, 20.0, None);
monitor.notify(20.0, 15.0, None);
std::thread::sleep(Duration::from_millis(10));
let metrics = monitor.metrics();
assert!(metrics.total_events >= 3); assert!(metrics.max_observed >= 20.0);
assert!(metrics.min_observed <= 10.0);
}
#[test]
fn test_callback_removal() {
let monitor = MinCutMonitor::new(MonitorConfig::default());
let counter = Arc::new(AtomicU64::new(0));
let counter_clone = counter.clone();
monitor
.on_event("test", move |_| {
counter_clone.fetch_add(1, Ordering::SeqCst);
})
.unwrap();
monitor.notify(0.0, 10.0, None);
std::thread::sleep(Duration::from_millis(10));
let count1 = counter.load(Ordering::SeqCst);
assert!(count1 > 0);
assert!(monitor.remove_callback("test"));
monitor.notify(10.0, 20.0, None);
std::thread::sleep(Duration::from_millis(10));
let count2 = counter.load(Ordering::SeqCst);
assert_eq!(count1, count2);
}
#[test]
fn test_threshold_removal() {
let monitor = MinCutMonitor::new(MonitorConfig::default());
let threshold = Threshold::new(10.0, "test".to_string(), true);
monitor.add_threshold(threshold).unwrap();
assert!(monitor.remove_threshold("test"));
assert!(!monitor.remove_threshold("test")); }
#[test]
fn test_builder_pattern() {
let counter = Arc::new(AtomicU64::new(0));
let counter_clone = counter.clone();
let monitor = MonitorBuilder::new()
.threshold_below(10.0, "low")
.threshold_above(100.0, "high")
.on_change("test", move |_| {
counter_clone.fetch_add(1, Ordering::SeqCst);
})
.build();
monitor.notify(50.0, 5.0, None);
std::thread::sleep(Duration::from_millis(10));
assert!(counter.load(Ordering::SeqCst) > 0);
let status = monitor.threshold_status();
assert_eq!(status.len(), 2);
}
#[test]
fn test_connectivity_events() {
let monitor = MinCutMonitor::new(MonitorConfig::default());
let disconnected = Arc::new(AtomicU64::new(0));
let connected = Arc::new(AtomicU64::new(0));
let disc_clone = disconnected.clone();
let conn_clone = connected.clone();
monitor
.on_event_type(EventType::Disconnected, "disc", move |_| {
disc_clone.fetch_add(1, Ordering::SeqCst);
})
.unwrap();
monitor
.on_event_type(EventType::Connected, "conn", move |_| {
conn_clone.fetch_add(1, Ordering::SeqCst);
})
.unwrap();
monitor.notify(10.0, 0.0, None);
std::thread::sleep(Duration::from_millis(10));
assert!(disconnected.load(Ordering::SeqCst) > 0);
monitor.notify(0.0, 5.0, None);
std::thread::sleep(Duration::from_millis(10));
assert!(connected.load(Ordering::SeqCst) > 0);
}
#[test]
fn test_max_callbacks_limit() {
let config = MonitorConfig {
max_callbacks: 2,
..Default::default()
};
let monitor = MinCutMonitor::new(config);
assert!(monitor.on_event("cb1", |_| {}).is_ok());
assert!(monitor.on_event("cb2", |_| {}).is_ok());
assert!(monitor.on_event("cb3", |_| {}).is_err());
}
#[test]
fn test_duplicate_threshold_name() {
let monitor = MinCutMonitor::new(MonitorConfig::default());
let t1 = Threshold::new(10.0, "test".to_string(), true);
let t2 = Threshold::new(20.0, "test".to_string(), false);
assert!(monitor.add_threshold(t1).is_ok());
assert!(monitor.add_threshold(t2).is_err());
}
#[test]
fn test_metrics_reset() {
let monitor = MinCutMonitor::new(MonitorConfig::default());
monitor.notify(0.0, 10.0, None);
monitor.notify(10.0, 20.0, None);
let metrics1 = monitor.metrics();
assert!(metrics1.total_events > 0);
monitor.reset_metrics();
let metrics2 = monitor.metrics();
assert_eq!(metrics2.total_events, 0);
}
#[test]
fn test_edge_events() {
let monitor = MinCutMonitor::new(MonitorConfig::default());
let counter = Arc::new(AtomicU64::new(0));
let counter_clone = counter.clone();
monitor
.on_event_type(EventType::EdgeInserted, "edge", move |event| {
assert!(event.edge.is_some());
counter_clone.fetch_add(1, Ordering::SeqCst);
})
.unwrap();
monitor.notify(10.0, 15.0, Some((1, 2)));
std::thread::sleep(Duration::from_millis(10));
assert!(counter.load(Ordering::SeqCst) > 0);
}
#[test]
fn test_threshold_hysteresis() {
let mut threshold = Threshold::new(10.0, "test".to_string(), true);
assert_eq!(
threshold.check_crossing(15.0, 5.0),
Some(EventType::ThresholdCrossedBelow)
);
assert_eq!(threshold.check_crossing(5.0, 3.0), None);
assert_eq!(threshold.check_crossing(3.0, 8.0), None);
assert_eq!(threshold.check_crossing(8.0, 15.0), None);
assert_eq!(
threshold.check_crossing(15.0, 5.0),
Some(EventType::ThresholdCrossedBelow)
);
}
#[test]
fn test_concurrent_callbacks() {
let monitor = Arc::new(MinCutMonitor::new(MonitorConfig::default()));
let counter = Arc::new(AtomicU64::new(0));
for i in 0..10 {
let counter_clone = counter.clone();
monitor
.on_event(&format!("cb{}", i), move |_| {
counter_clone.fetch_add(1, Ordering::SeqCst);
})
.unwrap();
}
let handles: Vec<_> = (0..5)
.map(|i| {
let monitor_clone = monitor.clone();
std::thread::spawn(move || {
monitor_clone.notify(i as f64, (i + 1) as f64, None);
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
std::thread::sleep(Duration::from_millis(50));
assert!(counter.load(Ordering::SeqCst) > 0);
}
#[test]
fn test_average_calculation() {
let monitor = MinCutMonitor::new(MonitorConfig::default());
monitor.notify(0.0, 10.0, None);
monitor.notify(10.0, 20.0, None);
monitor.notify(20.0, 30.0, None);
let metrics = monitor.metrics();
assert!(metrics.avg_cut > 0.0);
}
#[test]
fn test_history_sampling() {
let config = MonitorConfig {
sample_interval: Duration::from_millis(1),
max_history_size: 5,
..Default::default()
};
let monitor = MinCutMonitor::new(config);
for i in 0..10 {
monitor.notify(i as f64, (i + 1) as f64, None);
std::thread::sleep(Duration::from_millis(2));
}
let metrics = monitor.metrics();
assert!(metrics.cut_history.len() <= 5);
}
#[test]
fn test_no_change_no_event() {
let monitor = MinCutMonitor::new(MonitorConfig::default());
let counter = Arc::new(AtomicU64::new(0));
let counter_clone = counter.clone();
monitor
.on_event("test", move |_| {
counter_clone.fetch_add(1, Ordering::SeqCst);
})
.unwrap();
monitor.notify(10.0, 10.0, None);
std::thread::sleep(Duration::from_millis(10));
assert_eq!(counter.load(Ordering::SeqCst), 0);
}
#[test]
fn test_threshold_status() {
let monitor = MinCutMonitor::new(MonitorConfig::default());
monitor
.add_threshold(Threshold::new(10.0, "low".to_string(), true))
.unwrap();
monitor
.add_threshold(Threshold::new(100.0, "high".to_string(), false))
.unwrap();
monitor.notify(0.0, 50.0, None);
let status = monitor.threshold_status();
assert_eq!(status.len(), 2);
for (name, active) in &status {
assert!(!active, "Threshold {} should not be active at 50", name);
}
}
}