Skip to main content

nexus_memory_hooks/
detector.rs

1//! Inactivity detector for session timeout detection
2//!
3//! This module provides the TERTIARY layer of detection with ~90% reliability.
4
5use chrono::{DateTime, Utc};
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::{broadcast, RwLock};
10use tokio::time::interval;
11
12use crate::monitor::MonitorEvent;
13
14/// Inactivity detector for detecting session timeouts
15///
16/// This is the TERTIARY layer with ~90% reliability. It triggers
17/// when an agent session has been inactive for too long.
18#[derive(Clone)]
19pub struct InactivityDetector {
20    /// Inactivity threshold
21    threshold: Duration,
22
23    /// Last activity times
24    last_activity: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
25
26    /// Event sender
27    event_sender: broadcast::Sender<MonitorEvent>,
28
29    /// Check interval
30    check_interval: Duration,
31
32    /// Active flag
33    active: Arc<RwLock<bool>>,
34}
35
36impl InactivityDetector {
37    /// Create a new inactivity detector with default threshold (5 minutes)
38    pub fn new() -> Self {
39        Self::with_threshold(Duration::from_secs(300))
40    }
41
42    /// Create with custom threshold
43    pub fn with_threshold(threshold: Duration) -> Self {
44        let (event_sender, _) = broadcast::channel(100);
45
46        Self {
47            threshold,
48            last_activity: Arc::new(RwLock::new(HashMap::new())),
49            event_sender,
50            check_interval: Duration::from_secs(30),
51            active: Arc::new(RwLock::new(false)),
52        }
53    }
54
55    /// Set check interval
56    pub fn with_check_interval(mut self, interval: Duration) -> Self {
57        self.check_interval = interval;
58        self
59    }
60
61    /// Subscribe to events
62    pub fn subscribe(&self) -> broadcast::Receiver<MonitorEvent> {
63        self.event_sender.subscribe()
64    }
65
66    /// Record activity for an agent
67    pub async fn record_activity(&self, agent_type: &str) {
68        let mut activity = self.last_activity.write().await;
69        activity.insert(agent_type.to_string(), Utc::now());
70    }
71
72    /// Check if agent is inactive
73    pub async fn is_inactive(&self, agent_type: &str) -> bool {
74        let activity = self.last_activity.read().await;
75
76        if let Some(last) = activity.get(agent_type) {
77            let elapsed = (Utc::now() - *last).to_std().unwrap_or(Duration::ZERO);
78            elapsed > self.threshold
79        } else {
80            false
81        }
82    }
83
84    /// Get inactive duration
85    pub async fn get_inactive_duration(&self, agent_type: &str) -> Option<Duration> {
86        let activity = self.last_activity.read().await;
87
88        activity
89            .get(agent_type)
90            .map(|last| (Utc::now() - *last).to_std().unwrap_or(Duration::ZERO))
91    }
92
93    /// Check if agent has been inactive and trigger event
94    async fn check_and_notify(&self, agent_type: &str) -> Option<Duration> {
95        if let Some(duration) = self.get_inactive_duration(agent_type).await {
96            if duration > self.threshold {
97                let _ = self.event_sender.send(MonitorEvent::InactivityDetected {
98                    agent_type: agent_type.to_string(),
99                    inactive_for_secs: duration.as_secs(),
100                });
101                return Some(duration);
102            }
103        }
104        None
105    }
106
107    /// Start monitoring for inactivity
108    pub async fn start_monitoring(&self, agent_types: Vec<String>) {
109        let mut active = self.active.write().await;
110        if *active {
111            return;
112        }
113        *active = true;
114        drop(active);
115
116        let detector = self.clone();
117
118        tokio::spawn(async move {
119            let mut ticker = interval(detector.check_interval);
120
121            loop {
122                ticker.tick().await;
123
124                // Check if still active
125                {
126                    let a = detector.active.read().await;
127                    if !*a {
128                        break;
129                    }
130                }
131
132                for agent_type in &agent_types {
133                    let _ = detector.check_and_notify(agent_type).await;
134                }
135            }
136        });
137    }
138
139    /// Stop monitoring
140    pub async fn stop_monitoring(&self) {
141        let mut active = self.active.write().await;
142        *active = false;
143    }
144
145    /// Clear activity for an agent
146    pub async fn clear_activity(&self, agent_type: &str) {
147        let mut activity = self.last_activity.write().await;
148        activity.remove(agent_type);
149    }
150}
151
152impl Default for InactivityDetector {
153    fn default() -> Self {
154        Self::new()
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161
162    #[tokio::test]
163    async fn test_inactivity_detector_new() {
164        let detector = InactivityDetector::new();
165        assert_eq!(detector.threshold, Duration::from_secs(300));
166    }
167
168    #[tokio::test]
169    async fn test_record_and_check_activity() {
170        let detector = InactivityDetector::new();
171
172        detector.record_activity("test-agent").await;
173
174        // Should not be inactive immediately
175        let is_inactive = detector.is_inactive("test-agent").await;
176        assert!(!is_inactive);
177
178        // Duration should be small
179        let duration = detector.get_inactive_duration("test-agent").await;
180        assert!(duration.is_some());
181        assert!(duration.unwrap() < Duration::from_secs(1));
182    }
183
184    #[tokio::test]
185    async fn test_clear_activity() {
186        let detector = InactivityDetector::new();
187
188        detector.record_activity("test-agent").await;
189        detector.clear_activity("test-agent").await;
190
191        let duration = detector.get_inactive_duration("test-agent").await;
192        assert!(duration.is_none());
193    }
194
195    #[tokio::test]
196    async fn test_subscribe_events() {
197        let detector = InactivityDetector::new();
198        let mut receiver = detector.subscribe();
199
200        // Send test event
201        let _ = detector
202            .event_sender
203            .send(MonitorEvent::InactivityDetected {
204                agent_type: "test".to_string(),
205                inactive_for_secs: 300,
206            });
207
208        let event = receiver.try_recv();
209        assert!(event.is_ok());
210    }
211}