use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};
use tokio::time::interval;
use crate::monitor::MonitorEvent;
#[derive(Clone)]
pub struct InactivityDetector {
threshold: Duration,
last_activity: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
event_sender: broadcast::Sender<MonitorEvent>,
check_interval: Duration,
active: Arc<RwLock<bool>>,
}
impl InactivityDetector {
pub fn new() -> Self {
Self::with_threshold(Duration::from_secs(300))
}
pub fn with_threshold(threshold: Duration) -> Self {
let (event_sender, _) = broadcast::channel(100);
Self {
threshold,
last_activity: Arc::new(RwLock::new(HashMap::new())),
event_sender,
check_interval: Duration::from_secs(30),
active: Arc::new(RwLock::new(false)),
}
}
pub fn with_check_interval(mut self, interval: Duration) -> Self {
self.check_interval = interval;
self
}
pub fn subscribe(&self) -> broadcast::Receiver<MonitorEvent> {
self.event_sender.subscribe()
}
pub async fn record_activity(&self, agent_type: &str) {
let mut activity = self.last_activity.write().await;
activity.insert(agent_type.to_string(), Utc::now());
}
pub async fn is_inactive(&self, agent_type: &str) -> bool {
let activity = self.last_activity.read().await;
if let Some(last) = activity.get(agent_type) {
let elapsed = (Utc::now() - *last).to_std().unwrap_or(Duration::ZERO);
elapsed > self.threshold
} else {
false
}
}
pub async fn get_inactive_duration(&self, agent_type: &str) -> Option<Duration> {
let activity = self.last_activity.read().await;
activity
.get(agent_type)
.map(|last| (Utc::now() - *last).to_std().unwrap_or(Duration::ZERO))
}
async fn check_and_notify(&self, agent_type: &str) -> Option<Duration> {
if let Some(duration) = self.get_inactive_duration(agent_type).await {
if duration > self.threshold {
let _ = self.event_sender.send(MonitorEvent::InactivityDetected {
agent_type: agent_type.to_string(),
inactive_for_secs: duration.as_secs(),
});
return Some(duration);
}
}
None
}
pub async fn start_monitoring(&self, agent_types: Vec<String>) {
let mut active = self.active.write().await;
if *active {
return;
}
*active = true;
drop(active);
let detector = self.clone();
tokio::spawn(async move {
let mut ticker = interval(detector.check_interval);
loop {
ticker.tick().await;
{
let a = detector.active.read().await;
if !*a {
break;
}
}
for agent_type in &agent_types {
let _ = detector.check_and_notify(agent_type).await;
}
}
});
}
pub async fn stop_monitoring(&self) {
let mut active = self.active.write().await;
*active = false;
}
pub async fn clear_activity(&self, agent_type: &str) {
let mut activity = self.last_activity.write().await;
activity.remove(agent_type);
}
}
impl Default for InactivityDetector {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_inactivity_detector_new() {
let detector = InactivityDetector::new();
assert_eq!(detector.threshold, Duration::from_secs(300));
}
#[tokio::test]
async fn test_record_and_check_activity() {
let detector = InactivityDetector::new();
detector.record_activity("test-agent").await;
let is_inactive = detector.is_inactive("test-agent").await;
assert!(!is_inactive);
let duration = detector.get_inactive_duration("test-agent").await;
assert!(duration.is_some());
assert!(duration.unwrap() < Duration::from_secs(1));
}
#[tokio::test]
async fn test_clear_activity() {
let detector = InactivityDetector::new();
detector.record_activity("test-agent").await;
detector.clear_activity("test-agent").await;
let duration = detector.get_inactive_duration("test-agent").await;
assert!(duration.is_none());
}
#[tokio::test]
async fn test_subscribe_events() {
let detector = InactivityDetector::new();
let mut receiver = detector.subscribe();
let _ = detector
.event_sender
.send(MonitorEvent::InactivityDetected {
agent_type: "test".to_string(),
inactive_for_secs: 300,
});
let event = receiver.try_recv();
assert!(event.is_ok());
}
}