Skip to main content

missiond_core/cc_tasks/
watcher.rs

1//! Claude Code Tasks Watcher
2//!
3//! Monitors Claude Code sessions and their tasks in real-time
4
5use super::parser::*;
6use super::types::*;
7use chrono::{Duration, Utc};
8use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use tokio::sync::{broadcast, RwLock};
13use tracing::{debug, error, info, warn};
14
15/// Watcher options
16pub struct CCTasksWatcherOptions {
17    /// Claude home directory, defaults to ~/.claude
18    pub claude_home: Option<PathBuf>,
19    /// How often to check for inactive sessions (ms), default 60000
20    pub inactive_check_interval_ms: u64,
21    /// Max recent changes to keep, default 50
22    pub max_recent_changes: usize,
23}
24
25impl Default for CCTasksWatcherOptions {
26    fn default() -> Self {
27        Self {
28            claude_home: None,
29            inactive_check_interval_ms: 60000,
30            max_recent_changes: 50,
31        }
32    }
33}
34
35/// Watcher events
36#[derive(Debug, Clone)]
37pub enum WatcherEvent {
38    TasksChanged(CCTaskChangeEvent),
39    TaskStarted { session: CCSession, task: CCTask },
40    TaskCompleted { session: CCSession, task: CCTask },
41    SessionActive(CCSession),
42    SessionInactive(CCSession),
43}
44
45/// Claude Code Tasks Watcher
46pub struct CCTasksWatcher {
47    projects_dir: PathBuf,
48    sessions: Arc<RwLock<HashMap<String, CCSession>>>,
49    file_positions: Arc<RwLock<HashMap<String, u64>>>,
50    recent_changes: Arc<RwLock<Vec<CCTaskChangeEvent>>>,
51    max_recent_changes: usize,
52    inactive_check_interval_ms: u64,
53    event_tx: broadcast::Sender<WatcherEvent>,
54    watcher: Option<RecommendedWatcher>,
55    started: Arc<RwLock<bool>>,
56}
57
58impl CCTasksWatcher {
59    /// Create a new watcher
60    pub fn new(options: CCTasksWatcherOptions) -> Self {
61        let claude_home = options.claude_home.unwrap_or_else(|| {
62            dirs::home_dir()
63                .map(|h| h.join(".claude"))
64                .unwrap_or_else(|| PathBuf::from("~/.claude"))
65        });
66        let projects_dir = claude_home.join("projects");
67
68        let (event_tx, _) = broadcast::channel(100);
69
70        Self {
71            projects_dir,
72            sessions: Arc::new(RwLock::new(HashMap::new())),
73            file_positions: Arc::new(RwLock::new(HashMap::new())),
74            recent_changes: Arc::new(RwLock::new(Vec::new())),
75            max_recent_changes: options.max_recent_changes,
76            inactive_check_interval_ms: options.inactive_check_interval_ms,
77            event_tx,
78            watcher: None,
79            started: Arc::new(RwLock::new(false)),
80        }
81    }
82
83    /// Subscribe to watcher events
84    pub fn subscribe(&self) -> broadcast::Receiver<WatcherEvent> {
85        self.event_tx.subscribe()
86    }
87
88    /// Start watching Claude Code sessions
89    pub async fn start(&mut self) -> anyhow::Result<()> {
90        {
91            let mut started = self.started.write().await;
92            if *started {
93                return Ok(());
94            }
95            *started = true;
96        }
97
98        info!(projects_dir = ?self.projects_dir, "Starting CCTasksWatcher");
99
100        // Initial scan
101        self.scan_all_projects().await;
102
103        // Setup file watcher
104        let (tx, mut rx) = tokio::sync::mpsc::channel(100);
105
106        let watcher_tx = tx.clone();
107        let mut watcher = RecommendedWatcher::new(
108            move |res: Result<Event, notify::Error>| {
109                if let Ok(event) = res {
110                    let _ = watcher_tx.blocking_send(event);
111                }
112            },
113            Config::default(),
114        )?;
115
116        watcher.watch(&self.projects_dir, RecursiveMode::Recursive)?;
117        self.watcher = Some(watcher);
118
119        // Spawn event handler
120        let sessions = self.sessions.clone();
121        let file_positions = self.file_positions.clone();
122        let recent_changes = self.recent_changes.clone();
123        let max_recent_changes = self.max_recent_changes;
124        let event_tx = self.event_tx.clone();
125        let projects_dir = self.projects_dir.clone();
126        let started = self.started.clone();
127
128        tokio::spawn(async move {
129            while let Some(event) = rx.recv().await {
130                if !*started.read().await {
131                    break;
132                }
133
134                for path in event.paths {
135                    let path_str = path.to_string_lossy();
136
137                    // Skip non-relevant files
138                    if path_str.contains("tool-results") || path_str.contains("session-memory") {
139                        continue;
140                    }
141
142                    if path.ends_with("sessions-index.json") {
143                        Self::load_project_sessions_static(
144                            &path,
145                            &sessions,
146                            &file_positions,
147                            &event_tx,
148                        )
149                        .await;
150                    } else if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
151                        Self::check_session_for_changes_static(
152                            &path,
153                            &sessions,
154                            &recent_changes,
155                            max_recent_changes,
156                            &event_tx,
157                            &projects_dir,
158                            &file_positions,
159                        )
160                        .await;
161                    }
162                }
163            }
164        });
165
166        // Spawn inactive check timer
167        let sessions = self.sessions.clone();
168        let event_tx = self.event_tx.clone();
169        let inactive_check_interval = self.inactive_check_interval_ms;
170        let started = self.started.clone();
171
172        tokio::spawn(async move {
173            let mut interval =
174                tokio::time::interval(std::time::Duration::from_millis(inactive_check_interval));
175
176            loop {
177                interval.tick().await;
178
179                if !*started.read().await {
180                    break;
181                }
182
183                Self::check_inactive_sessions_static(&sessions, &event_tx).await;
184            }
185        });
186
187        info!("CCTasksWatcher started");
188        Ok(())
189    }
190
191    /// Stop watching
192    pub async fn stop(&mut self) {
193        {
194            let mut started = self.started.write().await;
195            if !*started {
196                return;
197            }
198            *started = false;
199        }
200
201        self.watcher = None;
202        self.sessions.write().await.clear();
203        self.file_positions.write().await.clear();
204        self.recent_changes.write().await.clear();
205
206        info!("CCTasksWatcher stopped");
207    }
208
209    /// Scan all projects directories
210    async fn scan_all_projects(&self) {
211        if !self.projects_dir.exists() {
212            warn!(projects_dir = ?self.projects_dir, "Projects directory not found");
213            return;
214        }
215
216        let mut entries = match tokio::fs::read_dir(&self.projects_dir).await {
217            Ok(e) => e,
218            Err(e) => {
219                error!(?e, "Failed to read projects directory");
220                return;
221            }
222        };
223
224        while let Ok(Some(entry)) = entries.next_entry().await {
225            let path = entry.path();
226            if !path.is_dir() {
227                continue;
228            }
229
230            let index_path = path.join("sessions-index.json");
231            Self::load_project_sessions_static(
232                &index_path,
233                &self.sessions,
234                &self.file_positions,
235                &self.event_tx,
236            )
237            .await;
238        }
239
240        let session_count = self.sessions.read().await.len();
241        info!(session_count, "Initial scan complete");
242    }
243
244    /// Load sessions from a project's sessions-index.json (static version)
245    async fn load_project_sessions_static(
246        index_path: &Path,
247        sessions: &Arc<RwLock<HashMap<String, CCSession>>>,
248        file_positions: &Arc<RwLock<HashMap<String, u64>>>,
249        _event_tx: &broadcast::Sender<WatcherEvent>,
250    ) {
251        let index = match parse_sessions_index(index_path).await {
252            Some(i) => i,
253            None => return,
254        };
255
256        for entry in index.entries {
257            let session = entry_to_session(&entry).await;
258            let mut sessions = sessions.write().await;
259
260            if !sessions.contains_key(&session.session_id) {
261                let file_size = get_file_size(Path::new(&session.full_path)).await;
262                file_positions
263                    .write()
264                    .await
265                    .insert(session.full_path.clone(), file_size);
266            }
267
268            sessions.insert(session.session_id.clone(), session);
269        }
270    }
271
272    /// Check a session file for task changes (static version)
273    async fn check_session_for_changes_static(
274        file_path: &Path,
275        sessions: &Arc<RwLock<HashMap<String, CCSession>>>,
276        recent_changes: &Arc<RwLock<Vec<CCTaskChangeEvent>>>,
277        max_recent_changes: usize,
278        event_tx: &broadcast::Sender<WatcherEvent>,
279        _projects_dir: &Path,
280        file_positions: &Arc<RwLock<HashMap<String, u64>>>,
281    ) {
282        let file_path_str = file_path.to_string_lossy().to_string();
283
284        // Find session by file path
285        let session_opt = {
286            let sessions = sessions.read().await;
287            sessions
288                .values()
289                .find(|s| s.full_path == file_path_str)
290                .cloned()
291        };
292
293        let mut session = match session_opt {
294            Some(s) => s,
295            None => {
296                // New session file, try to load from index
297                if let Some(parent) = file_path.parent() {
298                    let index_path = parent.join("sessions-index.json");
299                    Self::load_project_sessions_static(
300                        &index_path,
301                        sessions,
302                        file_positions,
303                        event_tx,
304                    )
305                    .await;
306                }
307                return;
308            }
309        };
310
311        // Get previous tasks
312        let previous_tasks = session.tasks.clone();
313
314        // Parse new tasks
315        let current_tasks = parse_last_todos(file_path, 100).await;
316
317        // Check for changes
318        let diff = diff_tasks(&previous_tasks, &current_tasks);
319
320        if !diff.is_empty() {
321            let was_active = session.is_active;
322            session.tasks = current_tasks.clone();
323            session.modified = Utc::now();
324            session.is_active = true;
325
326            // Create change event
327            let change_event = CCTaskChangeEvent {
328                session_id: session.session_id.clone(),
329                project_path: session.project_path.clone(),
330                project_name: session.project_name.clone(),
331                previous_tasks,
332                current_tasks,
333                added: diff.added.clone(),
334                removed: diff.removed,
335                status_changed: diff.status_changed.clone(),
336                timestamp: Utc::now(),
337            };
338
339            // Add to recent changes
340            {
341                let mut changes = recent_changes.write().await;
342                changes.insert(0, change_event.clone());
343                if changes.len() > max_recent_changes {
344                    changes.pop();
345                }
346            }
347
348            // Update session
349            sessions
350                .write()
351                .await
352                .insert(session.session_id.clone(), session.clone());
353
354            // Emit events
355            let _ = event_tx.send(WatcherEvent::TasksChanged(change_event));
356
357            // Emit specific events for started/completed tasks
358            for change in &diff.status_changed {
359                if change.task.status == CCTaskStatus::InProgress
360                    && change.previous_status == CCTaskStatus::Pending
361                {
362                    let _ = event_tx.send(WatcherEvent::TaskStarted {
363                        session: session.clone(),
364                        task: change.task.clone(),
365                    });
366                } else if change.task.status == CCTaskStatus::Completed {
367                    let _ = event_tx.send(WatcherEvent::TaskCompleted {
368                        session: session.clone(),
369                        task: change.task.clone(),
370                    });
371                }
372            }
373
374            // Emit session active if it wasn't before
375            if !was_active {
376                let _ = event_tx.send(WatcherEvent::SessionActive(session));
377            }
378
379            debug!(
380                added = diff.added.len(),
381                status_changed = diff.status_changed.len(),
382                "Tasks changed"
383            );
384        }
385    }
386
387    /// Check for inactive sessions (static version)
388    async fn check_inactive_sessions_static(
389        sessions: &Arc<RwLock<HashMap<String, CCSession>>>,
390        event_tx: &broadcast::Sender<WatcherEvent>,
391    ) {
392        let now = Utc::now();
393        let five_minutes_ago = now - Duration::minutes(5);
394
395        let mut sessions = sessions.write().await;
396        for session in sessions.values_mut() {
397            if session.is_active && session.modified < five_minutes_ago {
398                session.is_active = false;
399                let _ = event_tx.send(WatcherEvent::SessionInactive(session.clone()));
400            }
401        }
402    }
403
404    // ============ Public API ============
405
406    /// Get all sessions
407    pub async fn get_all_sessions(&self) -> Vec<CCSession> {
408        self.sessions.read().await.values().cloned().collect()
409    }
410
411    /// Get active sessions (updated within last 5 minutes)
412    pub async fn get_active_sessions(&self) -> Vec<CCSession> {
413        self.sessions
414            .read()
415            .await
416            .values()
417            .filter(|s| s.is_active)
418            .cloned()
419            .collect()
420    }
421
422    /// Get sessions by project path
423    pub async fn get_sessions_by_project(&self, project_path: &str) -> Vec<CCSession> {
424        self.sessions
425            .read()
426            .await
427            .values()
428            .filter(|s| s.project_path.contains(project_path) || s.project_name.contains(project_path))
429            .cloned()
430            .collect()
431    }
432
433    /// Get a specific session by ID
434    pub async fn get_session(&self, session_id: &str) -> Option<CCSession> {
435        self.sessions.read().await.get(session_id).cloned()
436    }
437
438    /// Get tasks for a specific session
439    pub async fn get_session_tasks(&self, session_id: &str) -> Option<Vec<CCTask>> {
440        self.sessions
441            .read()
442            .await
443            .get(session_id)
444            .map(|s| s.tasks.clone())
445    }
446
447    /// Get all in-progress tasks across all sessions
448    pub async fn get_in_progress_tasks(&self) -> Vec<CCInProgressTask> {
449        let sessions = self.sessions.read().await;
450        let mut result = Vec::new();
451
452        for session in sessions.values() {
453            for task in &session.tasks {
454                if task.status == CCTaskStatus::InProgress {
455                    result.push(CCInProgressTask {
456                        session_id: session.session_id.clone(),
457                        project_path: session.project_path.clone(),
458                        project_name: session.project_name.clone(),
459                        summary: session.summary.clone(),
460                        task: task.clone(),
461                        modified: session.modified,
462                    });
463                }
464            }
465        }
466
467        // Sort by most recently modified
468        result.sort_by(|a, b| b.modified.cmp(&a.modified));
469        result
470    }
471
472    /// Get tasks overview statistics
473    pub async fn get_overview(&self) -> CCTasksOverview {
474        let sessions: Vec<_> = self.sessions.read().await.values().cloned().collect();
475        let mut pending = 0;
476        let mut in_progress = 0;
477        let mut completed = 0;
478        let mut sessions_with_tasks = 0;
479
480        for session in &sessions {
481            if !session.tasks.is_empty() {
482                sessions_with_tasks += 1;
483            }
484            for task in &session.tasks {
485                match task.status {
486                    CCTaskStatus::Pending => pending += 1,
487                    CCTaskStatus::InProgress => in_progress += 1,
488                    CCTaskStatus::Completed => completed += 1,
489                }
490            }
491        }
492
493        let recent_changes: Vec<_> = self
494            .recent_changes
495            .read()
496            .await
497            .iter()
498            .take(10)
499            .cloned()
500            .collect();
501
502        CCTasksOverview {
503            total_sessions: sessions.len(),
504            active_sessions: sessions.iter().filter(|s| s.is_active).count(),
505            tasks_by_status: TasksByStatus {
506                pending,
507                in_progress,
508                completed,
509            },
510            sessions_with_tasks,
511            recent_changes,
512        }
513    }
514
515    /// Get recent task changes
516    pub async fn get_recent_changes(&self, limit: usize) -> Vec<CCTaskChangeEvent> {
517        self.recent_changes
518            .read()
519            .await
520            .iter()
521            .take(limit)
522            .cloned()
523            .collect()
524    }
525
526    /// Force refresh a specific session
527    pub async fn refresh_session(&self, session_id: &str) -> Option<CCSession> {
528        let mut sessions = self.sessions.write().await;
529        let session = sessions.get_mut(session_id)?;
530
531        let tasks = parse_last_todos(Path::new(&session.full_path), 100).await;
532        session.tasks = tasks;
533        session.modified = Utc::now();
534
535        Some(session.clone())
536    }
537
538    /// Force refresh all sessions
539    pub async fn refresh_all(&self) {
540        // Re-scan all projects
541        if !self.projects_dir.exists() {
542            return;
543        }
544
545        let mut entries = match tokio::fs::read_dir(&self.projects_dir).await {
546            Ok(e) => e,
547            Err(_) => return,
548        };
549
550        while let Ok(Some(entry)) = entries.next_entry().await {
551            let path = entry.path();
552            if !path.is_dir() {
553                continue;
554            }
555
556            let index_path = path.join("sessions-index.json");
557            Self::load_project_sessions_static(
558                &index_path,
559                &self.sessions,
560                &self.file_positions,
561                &self.event_tx,
562            )
563            .await;
564        }
565    }
566}