1use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11use sysinfo::{Pid, ProcessStatus, System};
12use tokio::sync::{broadcast, RwLock};
13use tokio::time::interval;
14
15use crate::error::Result;
16use crate::types::{AgentType, ProcessInfo, SessionActivity};
17
18pub struct ProcessMonitor {
20 system: System,
22
23 tracked_pids: HashMap<u32, AgentType>,
25
26 previous_states: HashMap<String, bool>,
28}
29
30impl Clone for ProcessMonitor {
31 fn clone(&self) -> Self {
32 Self {
33 system: System::new_all(),
34 tracked_pids: self.tracked_pids.clone(),
35 previous_states: self.previous_states.clone(),
36 }
37 }
38}
39
40impl ProcessMonitor {
41 pub fn new() -> Self {
43 Self {
44 system: System::new_all(),
45 tracked_pids: HashMap::new(),
46 previous_states: HashMap::new(),
47 }
48 }
49
50 pub fn refresh(&mut self) {
52 self.system.refresh_all();
53 }
54
55 pub fn track_process(&mut self, pid: u32, agent_type: AgentType) {
57 self.tracked_pids.insert(pid, agent_type);
58 }
59
60 pub fn untrack_process(&mut self, pid: u32) {
62 self.tracked_pids.remove(&pid);
63 }
64
65 pub fn is_process_alive(&self, pid: u32) -> bool {
67 self.system
68 .process(Pid::from(pid as usize))
69 .map(|p| p.status() == ProcessStatus::Run)
70 .unwrap_or(false)
71 }
72
73 pub fn get_process_info(&self, pid: u32) -> Option<ProcessInfo> {
75 self.system
76 .process(Pid::from(pid as usize))
77 .map(|p| ProcessInfo {
78 pid,
79 name: p.name().to_string_lossy().to_string(),
80 status: format!("{:?}", p.status()),
81 command: Some(
82 p.cmd()
83 .iter()
84 .map(|s| s.to_string_lossy().to_string())
85 .collect::<Vec<_>>()
86 .join(" "),
87 ),
88 working_dir: p.cwd().map(|p| p.to_string_lossy().to_string()),
89 create_time: chrono::DateTime::from_timestamp(p.start_time() as i64, 0),
90 cpu_percent: Some(p.cpu_usage()),
91 memory_bytes: Some(p.memory()),
92 })
93 }
94
95 pub fn find_agent_processes(&mut self, agent_type: AgentType) -> Vec<ProcessInfo> {
97 self.refresh();
98
99 let process_names = agent_type.process_names();
100 let mut found = Vec::new();
101
102 for process in self.system.processes().values() {
103 let name = process.name().to_string_lossy().to_lowercase();
104 let cmd = process
105 .cmd()
106 .iter()
107 .map(|s| s.to_string_lossy().to_lowercase())
108 .collect::<Vec<_>>()
109 .join(" ");
110
111 for pattern in process_names {
112 if name.contains(pattern) || cmd.contains(pattern) {
113 if let Some(info) = self.get_process_info(process.pid().as_u32()) {
114 found.push(info);
115 }
116 break;
117 }
118 }
119 }
120
121 found
122 }
123
124 pub fn check_terminated(&mut self) -> Vec<(u32, AgentType)> {
126 self.refresh();
127
128 let mut terminated = Vec::new();
129
130 for (pid, agent_type) in &self.tracked_pids {
131 if !self.is_process_alive(*pid) {
132 terminated.push((*pid, *agent_type));
133 }
134 }
135
136 for (pid, _) in &terminated {
138 self.tracked_pids.remove(pid);
139 }
140
141 terminated
142 }
143}
144
145impl Default for ProcessMonitor {
146 fn default() -> Self {
147 Self::new()
148 }
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
153pub enum MonitorEvent {
154 SessionStarted {
156 agent_type: String,
157 pid: Option<u32>,
158 timestamp: DateTime<Utc>,
159 },
160
161 SessionEnded {
163 agent_type: String,
164 reason: String,
165 timestamp: DateTime<Utc>,
166 },
167
168 ProcessDetected {
170 agent_type: String,
171 process: ProcessInfo,
172 },
173
174 ProcessTerminated { agent_type: String, pid: u32 },
176
177 InactivityDetected {
179 agent_type: String,
180 inactive_for_secs: u64,
181 },
182}
183
184pub struct SessionMonitor {
188 process_monitor: Arc<RwLock<ProcessMonitor>>,
190
191 event_sender: broadcast::Sender<MonitorEvent>,
193
194 last_activity: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
196
197 previous_states: Arc<RwLock<HashMap<String, bool>>>,
199
200 polling_interval: Duration,
202
203 inactivity_threshold: Duration,
205
206 active: Arc<RwLock<bool>>,
208}
209
210impl SessionMonitor {
211 pub fn new() -> Self {
213 let (event_sender, _) = broadcast::channel(100);
214
215 Self {
216 process_monitor: Arc::new(RwLock::new(ProcessMonitor::new())),
217 event_sender,
218 last_activity: Arc::new(RwLock::new(HashMap::new())),
219 previous_states: Arc::new(RwLock::new(HashMap::new())),
220 polling_interval: Duration::from_secs(5),
221 inactivity_threshold: Duration::from_secs(300),
222 active: Arc::new(RwLock::new(false)),
223 }
224 }
225
226 pub fn with_polling_interval(mut self, interval: Duration) -> Self {
228 self.polling_interval = interval;
229 self
230 }
231
232 pub fn with_inactivity_threshold(mut self, threshold: Duration) -> Self {
234 self.inactivity_threshold = threshold;
235 self
236 }
237
238 pub fn subscribe(&self) -> broadcast::Receiver<MonitorEvent> {
240 self.event_sender.subscribe()
241 }
242
243 pub async fn start_monitoring(&self, agent_types: Vec<AgentType>) {
245 let mut active = self.active.write().await;
246 if *active {
247 return;
248 }
249 *active = true;
250 drop(active);
251
252 let process_monitor = self.process_monitor.clone();
253 let event_sender = self.event_sender.clone();
254 let last_activity = self.last_activity.clone();
255 let previous_states = self.previous_states.clone();
256 let polling_interval = self.polling_interval;
257 let inactivity_threshold = self.inactivity_threshold;
258 let active_flag = self.active.clone();
259
260 tokio::spawn(async move {
261 let mut ticker = interval(polling_interval);
262
263 loop {
264 ticker.tick().await;
265
266 {
268 let a = active_flag.read().await;
269 if !*a {
270 break;
271 }
272 }
273
274 let mut monitor = process_monitor.write().await;
275 monitor.refresh();
276
277 for agent_type in &agent_types {
278 let processes = monitor.find_agent_processes(*agent_type);
279 let is_active = !processes.is_empty();
280
281 let was_active = {
283 let states = previous_states.read().await;
284 states
285 .get(&agent_type.to_string())
286 .copied()
287 .unwrap_or(false)
288 };
289
290 if is_active {
292 let mut activity = last_activity.write().await;
293 activity.insert(agent_type.to_string(), Utc::now());
294
295 for process in &processes {
297 let _ = event_sender.send(MonitorEvent::ProcessDetected {
298 agent_type: agent_type.to_string(),
299 process: process.clone(),
300 });
301 }
302 }
303
304 if was_active && !is_active {
306 let _ = event_sender.send(MonitorEvent::SessionEnded {
308 agent_type: agent_type.to_string(),
309 reason: "process_terminated".to_string(),
310 timestamp: Utc::now(),
311 });
312 } else if !was_active && is_active {
313 let _ = event_sender.send(MonitorEvent::SessionStarted {
315 agent_type: agent_type.to_string(),
316 pid: processes.first().map(|p| p.pid),
317 timestamp: Utc::now(),
318 });
319 }
320
321 if !is_active {
323 let activity = last_activity.read().await;
324 if let Some(last) = activity.get(&agent_type.to_string()) {
325 let elapsed = (Utc::now() - *last).to_std().unwrap_or(Duration::ZERO);
326
327 if elapsed > inactivity_threshold {
328 let _ = event_sender.send(MonitorEvent::InactivityDetected {
329 agent_type: agent_type.to_string(),
330 inactive_for_secs: elapsed.as_secs(),
331 });
332 }
333 }
334 }
335
336 {
338 let mut states = previous_states.write().await;
339 states.insert(agent_type.to_string(), is_active);
340 }
341 }
342 }
343 });
344 }
345
346 pub async fn stop_monitoring(&self) {
348 let mut active = self.active.write().await;
349 *active = false;
350 }
351
352 pub async fn record_activity(&self, agent_type: &str) {
354 let mut activity = self.last_activity.write().await;
355 activity.insert(agent_type.to_string(), Utc::now());
356 }
357
358 pub async fn is_inactive(&self, agent_type: &str) -> bool {
360 let activity = self.last_activity.read().await;
361
362 if let Some(last) = activity.get(agent_type) {
363 let elapsed = (Utc::now() - *last).to_std().unwrap_or(Duration::ZERO);
364 elapsed > self.inactivity_threshold
365 } else {
366 false
367 }
368 }
369
370 pub async fn get_inactive_duration(&self, agent_type: &str) -> Option<Duration> {
372 let activity = self.last_activity.read().await;
373
374 activity
375 .get(agent_type)
376 .map(|last| (Utc::now() - *last).to_std().unwrap_or(Duration::ZERO))
377 }
378
379 pub async fn detect_activity(&self, agent_type: AgentType) -> Result<SessionActivity> {
381 let mut monitor = self.process_monitor.write().await;
382 let processes = monitor.find_agent_processes(agent_type);
383
384 let mut activity = SessionActivity::new(agent_type);
385
386 if !processes.is_empty() {
387 activity.is_active = true;
388 activity.processes = processes;
389 }
390
391 Ok(activity)
392 }
393}
394
395impl Default for SessionMonitor {
396 fn default() -> Self {
397 Self::new()
398 }
399}
400
401#[cfg(test)]
402mod tests {
403 use super::*;
404
405 #[test]
406 fn test_process_monitor_new() {
407 let monitor = ProcessMonitor::new();
408 assert!(monitor.tracked_pids.is_empty());
409 }
410
411 #[test]
412 fn test_process_monitor_track() {
413 let mut monitor = ProcessMonitor::new();
414 monitor.track_process(1234, AgentType::ClaudeCode);
415
416 assert!(monitor.tracked_pids.contains_key(&1234));
417 }
418
419 #[test]
420 fn test_process_monitor_untrack() {
421 let mut monitor = ProcessMonitor::new();
422 monitor.track_process(1234, AgentType::ClaudeCode);
423 monitor.untrack_process(1234);
424
425 assert!(!monitor.tracked_pids.contains_key(&1234));
426 }
427
428 #[tokio::test]
429 async fn test_session_monitor_new() {
430 let monitor = SessionMonitor::new();
431 assert!(!*monitor.active.read().await);
432 }
433
434 #[tokio::test]
435 async fn test_session_monitor_record_activity() {
436 let monitor = SessionMonitor::new();
437 monitor.record_activity("claude-code").await;
438
439 let duration = monitor.get_inactive_duration("claude-code").await;
440 assert!(duration.is_some());
441 assert!(duration.unwrap() < Duration::from_secs(1));
442 }
443
444 #[tokio::test]
445 async fn test_session_monitor_subscribe() {
446 let monitor = SessionMonitor::new();
447 let mut receiver = monitor.subscribe();
448
449 let _ = monitor.event_sender.send(MonitorEvent::SessionStarted {
451 agent_type: "test".to_string(),
452 pid: None,
453 timestamp: Utc::now(),
454 });
455
456 let event = receiver.try_recv();
458 assert!(event.is_ok());
459 }
460}