nexus_memory_hooks/
detector.rs1use chrono::{DateTime, Utc};
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::{broadcast, RwLock};
10use tokio::time::interval;
11
12use crate::monitor::MonitorEvent;
13
14#[derive(Clone)]
19pub struct InactivityDetector {
20 threshold: Duration,
22
23 last_activity: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
25
26 event_sender: broadcast::Sender<MonitorEvent>,
28
29 check_interval: Duration,
31
32 active: Arc<RwLock<bool>>,
34}
35
36impl InactivityDetector {
37 pub fn new() -> Self {
39 Self::with_threshold(Duration::from_secs(300))
40 }
41
42 pub fn with_threshold(threshold: Duration) -> Self {
44 let (event_sender, _) = broadcast::channel(100);
45
46 Self {
47 threshold,
48 last_activity: Arc::new(RwLock::new(HashMap::new())),
49 event_sender,
50 check_interval: Duration::from_secs(30),
51 active: Arc::new(RwLock::new(false)),
52 }
53 }
54
55 pub fn with_check_interval(mut self, interval: Duration) -> Self {
57 self.check_interval = interval;
58 self
59 }
60
61 pub fn subscribe(&self) -> broadcast::Receiver<MonitorEvent> {
63 self.event_sender.subscribe()
64 }
65
66 pub async fn record_activity(&self, agent_type: &str) {
68 let mut activity = self.last_activity.write().await;
69 activity.insert(agent_type.to_string(), Utc::now());
70 }
71
72 pub async fn is_inactive(&self, agent_type: &str) -> bool {
74 let activity = self.last_activity.read().await;
75
76 if let Some(last) = activity.get(agent_type) {
77 let elapsed = (Utc::now() - *last).to_std().unwrap_or(Duration::ZERO);
78 elapsed > self.threshold
79 } else {
80 false
81 }
82 }
83
84 pub async fn get_inactive_duration(&self, agent_type: &str) -> Option<Duration> {
86 let activity = self.last_activity.read().await;
87
88 activity
89 .get(agent_type)
90 .map(|last| (Utc::now() - *last).to_std().unwrap_or(Duration::ZERO))
91 }
92
93 async fn check_and_notify(&self, agent_type: &str) -> Option<Duration> {
95 if let Some(duration) = self.get_inactive_duration(agent_type).await {
96 if duration > self.threshold {
97 let _ = self.event_sender.send(MonitorEvent::InactivityDetected {
98 agent_type: agent_type.to_string(),
99 inactive_for_secs: duration.as_secs(),
100 });
101 return Some(duration);
102 }
103 }
104 None
105 }
106
107 pub async fn start_monitoring(&self, agent_types: Vec<String>) {
109 let mut active = self.active.write().await;
110 if *active {
111 return;
112 }
113 *active = true;
114 drop(active);
115
116 let detector = self.clone();
117
118 tokio::spawn(async move {
119 let mut ticker = interval(detector.check_interval);
120
121 loop {
122 ticker.tick().await;
123
124 {
126 let a = detector.active.read().await;
127 if !*a {
128 break;
129 }
130 }
131
132 for agent_type in &agent_types {
133 let _ = detector.check_and_notify(agent_type).await;
134 }
135 }
136 });
137 }
138
139 pub async fn stop_monitoring(&self) {
141 let mut active = self.active.write().await;
142 *active = false;
143 }
144
145 pub async fn clear_activity(&self, agent_type: &str) {
147 let mut activity = self.last_activity.write().await;
148 activity.remove(agent_type);
149 }
150}
151
152impl Default for InactivityDetector {
153 fn default() -> Self {
154 Self::new()
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 #[tokio::test]
163 async fn test_inactivity_detector_new() {
164 let detector = InactivityDetector::new();
165 assert_eq!(detector.threshold, Duration::from_secs(300));
166 }
167
168 #[tokio::test]
169 async fn test_record_and_check_activity() {
170 let detector = InactivityDetector::new();
171
172 detector.record_activity("test-agent").await;
173
174 let is_inactive = detector.is_inactive("test-agent").await;
176 assert!(!is_inactive);
177
178 let duration = detector.get_inactive_duration("test-agent").await;
180 assert!(duration.is_some());
181 assert!(duration.unwrap() < Duration::from_secs(1));
182 }
183
184 #[tokio::test]
185 async fn test_clear_activity() {
186 let detector = InactivityDetector::new();
187
188 detector.record_activity("test-agent").await;
189 detector.clear_activity("test-agent").await;
190
191 let duration = detector.get_inactive_duration("test-agent").await;
192 assert!(duration.is_none());
193 }
194
195 #[tokio::test]
196 async fn test_subscribe_events() {
197 let detector = InactivityDetector::new();
198 let mut receiver = detector.subscribe();
199
200 let _ = detector
202 .event_sender
203 .send(MonitorEvent::InactivityDetected {
204 agent_type: "test".to_string(),
205 inactive_for_secs: 300,
206 });
207
208 let event = receiver.try_recv();
209 assert!(event.is_ok());
210 }
211}