Skip to main content

nexus_memory_hooks/
monitor.rs

1//! Session and process monitoring
2//!
3//! This module provides the SECONDARY layer of detection through
4//! process monitoring and session state tracking.
5
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11use sysinfo::{Pid, ProcessStatus, System};
12use tokio::sync::{broadcast, RwLock};
13use tokio::time::interval;
14
15use crate::error::Result;
16use crate::types::{AgentType, ProcessInfo, SessionActivity};
17
18/// Process monitor for tracking agent processes
19pub struct ProcessMonitor {
20    /// sysinfo system handle
21    system: System,
22
23    /// Tracked process IDs
24    tracked_pids: HashMap<u32, AgentType>,
25
26    /// Previous process states
27    previous_states: HashMap<String, bool>,
28}
29
30impl Clone for ProcessMonitor {
31    fn clone(&self) -> Self {
32        Self {
33            system: System::new_all(),
34            tracked_pids: self.tracked_pids.clone(),
35            previous_states: self.previous_states.clone(),
36        }
37    }
38}
39
40impl ProcessMonitor {
41    /// Create a new process monitor
42    pub fn new() -> Self {
43        Self {
44            system: System::new_all(),
45            tracked_pids: HashMap::new(),
46            previous_states: HashMap::new(),
47        }
48    }
49
50    /// Refresh process information
51    pub fn refresh(&mut self) {
52        self.system.refresh_all();
53    }
54
55    /// Track a process by PID
56    pub fn track_process(&mut self, pid: u32, agent_type: AgentType) {
57        self.tracked_pids.insert(pid, agent_type);
58    }
59
60    /// Stop tracking a process
61    pub fn untrack_process(&mut self, pid: u32) {
62        self.tracked_pids.remove(&pid);
63    }
64
65    /// Check if a process is alive
66    pub fn is_process_alive(&self, pid: u32) -> bool {
67        self.system
68            .process(Pid::from(pid as usize))
69            .map(|p| p.status() == ProcessStatus::Run)
70            .unwrap_or(false)
71    }
72
73    /// Get process information
74    pub fn get_process_info(&self, pid: u32) -> Option<ProcessInfo> {
75        self.system
76            .process(Pid::from(pid as usize))
77            .map(|p| ProcessInfo {
78                pid,
79                name: p.name().to_string_lossy().to_string(),
80                status: format!("{:?}", p.status()),
81                command: Some(
82                    p.cmd()
83                        .iter()
84                        .map(|s| s.to_string_lossy().to_string())
85                        .collect::<Vec<_>>()
86                        .join(" "),
87                ),
88                working_dir: p.cwd().map(|p| p.to_string_lossy().to_string()),
89                create_time: chrono::DateTime::from_timestamp(p.start_time() as i64, 0),
90                cpu_percent: Some(p.cpu_usage()),
91                memory_bytes: Some(p.memory()),
92            })
93    }
94
95    /// Find processes matching agent type
96    pub fn find_agent_processes(&mut self, agent_type: AgentType) -> Vec<ProcessInfo> {
97        self.refresh();
98
99        let process_names = agent_type.process_names();
100        let mut found = Vec::new();
101
102        for process in self.system.processes().values() {
103            let name = process.name().to_string_lossy().to_lowercase();
104            let cmd = process
105                .cmd()
106                .iter()
107                .map(|s| s.to_string_lossy().to_lowercase())
108                .collect::<Vec<_>>()
109                .join(" ");
110
111            for pattern in process_names {
112                if name.contains(pattern) || cmd.contains(pattern) {
113                    if let Some(info) = self.get_process_info(process.pid().as_u32()) {
114                        found.push(info);
115                    }
116                    break;
117                }
118            }
119        }
120
121        found
122    }
123
124    /// Monitor tracked processes and return terminated ones
125    pub fn check_terminated(&mut self) -> Vec<(u32, AgentType)> {
126        self.refresh();
127
128        let mut terminated = Vec::new();
129
130        for (pid, agent_type) in &self.tracked_pids {
131            if !self.is_process_alive(*pid) {
132                terminated.push((*pid, *agent_type));
133            }
134        }
135
136        // Untrack terminated processes
137        for (pid, _) in &terminated {
138            self.tracked_pids.remove(pid);
139        }
140
141        terminated
142    }
143}
144
145impl Default for ProcessMonitor {
146    fn default() -> Self {
147        Self::new()
148    }
149}
150
151/// Event emitted by session monitor
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub enum MonitorEvent {
154    /// Session started
155    SessionStarted {
156        agent_type: String,
157        pid: Option<u32>,
158        timestamp: DateTime<Utc>,
159    },
160
161    /// Session ended
162    SessionEnded {
163        agent_type: String,
164        reason: String,
165        timestamp: DateTime<Utc>,
166    },
167
168    /// Process detected
169    ProcessDetected {
170        agent_type: String,
171        process: ProcessInfo,
172    },
173
174    /// Process terminated
175    ProcessTerminated { agent_type: String, pid: u32 },
176
177    /// Inactivity detected
178    InactivityDetected {
179        agent_type: String,
180        inactive_for_secs: u64,
181    },
182}
183
184/// Session monitor for background process monitoring
185///
186/// This is the SECONDARY layer of detection with ~95% reliability.
187pub struct SessionMonitor {
188    /// Process monitor
189    process_monitor: Arc<RwLock<ProcessMonitor>>,
190
191    /// Event sender
192    event_sender: broadcast::Sender<MonitorEvent>,
193
194    /// Last activity times per agent
195    last_activity: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
196
197    /// Previous session states
198    previous_states: Arc<RwLock<HashMap<String, bool>>>,
199
200    /// Polling interval
201    polling_interval: Duration,
202
203    /// Inactivity threshold
204    inactivity_threshold: Duration,
205
206    /// Whether monitoring is active
207    active: Arc<RwLock<bool>>,
208}
209
210impl SessionMonitor {
211    /// Create a new session monitor
212    pub fn new() -> Self {
213        let (event_sender, _) = broadcast::channel(100);
214
215        Self {
216            process_monitor: Arc::new(RwLock::new(ProcessMonitor::new())),
217            event_sender,
218            last_activity: Arc::new(RwLock::new(HashMap::new())),
219            previous_states: Arc::new(RwLock::new(HashMap::new())),
220            polling_interval: Duration::from_secs(5),
221            inactivity_threshold: Duration::from_secs(300),
222            active: Arc::new(RwLock::new(false)),
223        }
224    }
225
226    /// Set polling interval
227    pub fn with_polling_interval(mut self, interval: Duration) -> Self {
228        self.polling_interval = interval;
229        self
230    }
231
232    /// Set inactivity threshold
233    pub fn with_inactivity_threshold(mut self, threshold: Duration) -> Self {
234        self.inactivity_threshold = threshold;
235        self
236    }
237
238    /// Subscribe to monitor events
239    pub fn subscribe(&self) -> broadcast::Receiver<MonitorEvent> {
240        self.event_sender.subscribe()
241    }
242
243    /// Start monitoring for specific agent types
244    pub async fn start_monitoring(&self, agent_types: Vec<AgentType>) {
245        let mut active = self.active.write().await;
246        if *active {
247            return;
248        }
249        *active = true;
250        drop(active);
251
252        let process_monitor = self.process_monitor.clone();
253        let event_sender = self.event_sender.clone();
254        let last_activity = self.last_activity.clone();
255        let previous_states = self.previous_states.clone();
256        let polling_interval = self.polling_interval;
257        let inactivity_threshold = self.inactivity_threshold;
258        let active_flag = self.active.clone();
259
260        tokio::spawn(async move {
261            let mut ticker = interval(polling_interval);
262
263            loop {
264                ticker.tick().await;
265
266                // Check if still active
267                {
268                    let a = active_flag.read().await;
269                    if !*a {
270                        break;
271                    }
272                }
273
274                let mut monitor = process_monitor.write().await;
275                monitor.refresh();
276
277                for agent_type in &agent_types {
278                    let processes = monitor.find_agent_processes(*agent_type);
279                    let is_active = !processes.is_empty();
280
281                    // Get previous state
282                    let was_active = {
283                        let states = previous_states.read().await;
284                        states
285                            .get(&agent_type.to_string())
286                            .copied()
287                            .unwrap_or(false)
288                    };
289
290                    // Update activity time
291                    if is_active {
292                        let mut activity = last_activity.write().await;
293                        activity.insert(agent_type.to_string(), Utc::now());
294
295                        // Emit process detected events
296                        for process in &processes {
297                            let _ = event_sender.send(MonitorEvent::ProcessDetected {
298                                agent_type: agent_type.to_string(),
299                                process: process.clone(),
300                            });
301                        }
302                    }
303
304                    // Check for state change
305                    if was_active && !is_active {
306                        // Session ended
307                        let _ = event_sender.send(MonitorEvent::SessionEnded {
308                            agent_type: agent_type.to_string(),
309                            reason: "process_terminated".to_string(),
310                            timestamp: Utc::now(),
311                        });
312                    } else if !was_active && is_active {
313                        // Session started
314                        let _ = event_sender.send(MonitorEvent::SessionStarted {
315                            agent_type: agent_type.to_string(),
316                            pid: processes.first().map(|p| p.pid),
317                            timestamp: Utc::now(),
318                        });
319                    }
320
321                    // Check for inactivity
322                    if !is_active {
323                        let activity = last_activity.read().await;
324                        if let Some(last) = activity.get(&agent_type.to_string()) {
325                            let elapsed = (Utc::now() - *last).to_std().unwrap_or(Duration::ZERO);
326
327                            if elapsed > inactivity_threshold {
328                                let _ = event_sender.send(MonitorEvent::InactivityDetected {
329                                    agent_type: agent_type.to_string(),
330                                    inactive_for_secs: elapsed.as_secs(),
331                                });
332                            }
333                        }
334                    }
335
336                    // Update previous state
337                    {
338                        let mut states = previous_states.write().await;
339                        states.insert(agent_type.to_string(), is_active);
340                    }
341                }
342            }
343        });
344    }
345
346    /// Stop monitoring
347    pub async fn stop_monitoring(&self) {
348        let mut active = self.active.write().await;
349        *active = false;
350    }
351
352    /// Record activity for an agent
353    pub async fn record_activity(&self, agent_type: &str) {
354        let mut activity = self.last_activity.write().await;
355        activity.insert(agent_type.to_string(), Utc::now());
356    }
357
358    /// Check if agent has been inactive
359    pub async fn is_inactive(&self, agent_type: &str) -> bool {
360        let activity = self.last_activity.read().await;
361
362        if let Some(last) = activity.get(agent_type) {
363            let elapsed = (Utc::now() - *last).to_std().unwrap_or(Duration::ZERO);
364            elapsed > self.inactivity_threshold
365        } else {
366            false
367        }
368    }
369
370    /// Get inactive duration for an agent
371    pub async fn get_inactive_duration(&self, agent_type: &str) -> Option<Duration> {
372        let activity = self.last_activity.read().await;
373
374        activity
375            .get(agent_type)
376            .map(|last| (Utc::now() - *last).to_std().unwrap_or(Duration::ZERO))
377    }
378
379    /// Detect session activity for an agent
380    pub async fn detect_activity(&self, agent_type: AgentType) -> Result<SessionActivity> {
381        let mut monitor = self.process_monitor.write().await;
382        let processes = monitor.find_agent_processes(agent_type);
383
384        let mut activity = SessionActivity::new(agent_type);
385
386        if !processes.is_empty() {
387            activity.is_active = true;
388            activity.processes = processes;
389        }
390
391        Ok(activity)
392    }
393}
394
395impl Default for SessionMonitor {
396    fn default() -> Self {
397        Self::new()
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404
405    #[test]
406    fn test_process_monitor_new() {
407        let monitor = ProcessMonitor::new();
408        assert!(monitor.tracked_pids.is_empty());
409    }
410
411    #[test]
412    fn test_process_monitor_track() {
413        let mut monitor = ProcessMonitor::new();
414        monitor.track_process(1234, AgentType::ClaudeCode);
415
416        assert!(monitor.tracked_pids.contains_key(&1234));
417    }
418
419    #[test]
420    fn test_process_monitor_untrack() {
421        let mut monitor = ProcessMonitor::new();
422        monitor.track_process(1234, AgentType::ClaudeCode);
423        monitor.untrack_process(1234);
424
425        assert!(!monitor.tracked_pids.contains_key(&1234));
426    }
427
428    #[tokio::test]
429    async fn test_session_monitor_new() {
430        let monitor = SessionMonitor::new();
431        assert!(!*monitor.active.read().await);
432    }
433
434    #[tokio::test]
435    async fn test_session_monitor_record_activity() {
436        let monitor = SessionMonitor::new();
437        monitor.record_activity("claude-code").await;
438
439        let duration = monitor.get_inactive_duration("claude-code").await;
440        assert!(duration.is_some());
441        assert!(duration.unwrap() < Duration::from_secs(1));
442    }
443
444    #[tokio::test]
445    async fn test_session_monitor_subscribe() {
446        let monitor = SessionMonitor::new();
447        let mut receiver = monitor.subscribe();
448
449        // Send test event
450        let _ = monitor.event_sender.send(MonitorEvent::SessionStarted {
451            agent_type: "test".to_string(),
452            pid: None,
453            timestamp: Utc::now(),
454        });
455
456        // Should receive event
457        let event = receiver.try_recv();
458        assert!(event.is_ok());
459    }
460}