1use super::parser::*;
6use super::types::*;
7use chrono::{Duration, Utc};
8use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use tokio::sync::{broadcast, RwLock};
13use tracing::{debug, error, info, warn};
14
15pub struct CCTasksWatcherOptions {
17 pub claude_home: Option<PathBuf>,
19 pub inactive_check_interval_ms: u64,
21 pub max_recent_changes: usize,
23}
24
25impl Default for CCTasksWatcherOptions {
26 fn default() -> Self {
27 Self {
28 claude_home: None,
29 inactive_check_interval_ms: 60000,
30 max_recent_changes: 50,
31 }
32 }
33}
34
35#[derive(Debug, Clone)]
37pub enum WatcherEvent {
38 TasksChanged(CCTaskChangeEvent),
39 TaskStarted { session: CCSession, task: CCTask },
40 TaskCompleted { session: CCSession, task: CCTask },
41 SessionActive(CCSession),
42 SessionInactive(CCSession),
43}
44
45pub struct CCTasksWatcher {
47 projects_dir: PathBuf,
48 sessions: Arc<RwLock<HashMap<String, CCSession>>>,
49 file_positions: Arc<RwLock<HashMap<String, u64>>>,
50 recent_changes: Arc<RwLock<Vec<CCTaskChangeEvent>>>,
51 max_recent_changes: usize,
52 inactive_check_interval_ms: u64,
53 event_tx: broadcast::Sender<WatcherEvent>,
54 watcher: Option<RecommendedWatcher>,
55 started: Arc<RwLock<bool>>,
56}
57
58impl CCTasksWatcher {
59 pub fn new(options: CCTasksWatcherOptions) -> Self {
61 let claude_home = options.claude_home.unwrap_or_else(|| {
62 dirs::home_dir()
63 .map(|h| h.join(".claude"))
64 .unwrap_or_else(|| PathBuf::from("~/.claude"))
65 });
66 let projects_dir = claude_home.join("projects");
67
68 let (event_tx, _) = broadcast::channel(100);
69
70 Self {
71 projects_dir,
72 sessions: Arc::new(RwLock::new(HashMap::new())),
73 file_positions: Arc::new(RwLock::new(HashMap::new())),
74 recent_changes: Arc::new(RwLock::new(Vec::new())),
75 max_recent_changes: options.max_recent_changes,
76 inactive_check_interval_ms: options.inactive_check_interval_ms,
77 event_tx,
78 watcher: None,
79 started: Arc::new(RwLock::new(false)),
80 }
81 }
82
83 pub fn subscribe(&self) -> broadcast::Receiver<WatcherEvent> {
85 self.event_tx.subscribe()
86 }
87
88 pub async fn start(&mut self) -> anyhow::Result<()> {
90 {
91 let mut started = self.started.write().await;
92 if *started {
93 return Ok(());
94 }
95 *started = true;
96 }
97
98 info!(projects_dir = ?self.projects_dir, "Starting CCTasksWatcher");
99
100 self.scan_all_projects().await;
102
103 let (tx, mut rx) = tokio::sync::mpsc::channel(100);
105
106 let watcher_tx = tx.clone();
107 let mut watcher = RecommendedWatcher::new(
108 move |res: Result<Event, notify::Error>| {
109 if let Ok(event) = res {
110 let _ = watcher_tx.blocking_send(event);
111 }
112 },
113 Config::default(),
114 )?;
115
116 watcher.watch(&self.projects_dir, RecursiveMode::Recursive)?;
117 self.watcher = Some(watcher);
118
119 let sessions = self.sessions.clone();
121 let file_positions = self.file_positions.clone();
122 let recent_changes = self.recent_changes.clone();
123 let max_recent_changes = self.max_recent_changes;
124 let event_tx = self.event_tx.clone();
125 let projects_dir = self.projects_dir.clone();
126 let started = self.started.clone();
127
128 tokio::spawn(async move {
129 while let Some(event) = rx.recv().await {
130 if !*started.read().await {
131 break;
132 }
133
134 for path in event.paths {
135 let path_str = path.to_string_lossy();
136
137 if path_str.contains("tool-results") || path_str.contains("session-memory") {
139 continue;
140 }
141
142 if path.ends_with("sessions-index.json") {
143 Self::load_project_sessions_static(
144 &path,
145 &sessions,
146 &file_positions,
147 &event_tx,
148 )
149 .await;
150 } else if path.extension().map(|e| e == "jsonl").unwrap_or(false) {
151 Self::check_session_for_changes_static(
152 &path,
153 &sessions,
154 &recent_changes,
155 max_recent_changes,
156 &event_tx,
157 &projects_dir,
158 &file_positions,
159 )
160 .await;
161 }
162 }
163 }
164 });
165
166 let sessions = self.sessions.clone();
168 let event_tx = self.event_tx.clone();
169 let inactive_check_interval = self.inactive_check_interval_ms;
170 let started = self.started.clone();
171
172 tokio::spawn(async move {
173 let mut interval =
174 tokio::time::interval(std::time::Duration::from_millis(inactive_check_interval));
175
176 loop {
177 interval.tick().await;
178
179 if !*started.read().await {
180 break;
181 }
182
183 Self::check_inactive_sessions_static(&sessions, &event_tx).await;
184 }
185 });
186
187 info!("CCTasksWatcher started");
188 Ok(())
189 }
190
191 pub async fn stop(&mut self) {
193 {
194 let mut started = self.started.write().await;
195 if !*started {
196 return;
197 }
198 *started = false;
199 }
200
201 self.watcher = None;
202 self.sessions.write().await.clear();
203 self.file_positions.write().await.clear();
204 self.recent_changes.write().await.clear();
205
206 info!("CCTasksWatcher stopped");
207 }
208
209 async fn scan_all_projects(&self) {
211 if !self.projects_dir.exists() {
212 warn!(projects_dir = ?self.projects_dir, "Projects directory not found");
213 return;
214 }
215
216 let mut entries = match tokio::fs::read_dir(&self.projects_dir).await {
217 Ok(e) => e,
218 Err(e) => {
219 error!(?e, "Failed to read projects directory");
220 return;
221 }
222 };
223
224 while let Ok(Some(entry)) = entries.next_entry().await {
225 let path = entry.path();
226 if !path.is_dir() {
227 continue;
228 }
229
230 let index_path = path.join("sessions-index.json");
231 Self::load_project_sessions_static(
232 &index_path,
233 &self.sessions,
234 &self.file_positions,
235 &self.event_tx,
236 )
237 .await;
238 }
239
240 let session_count = self.sessions.read().await.len();
241 info!(session_count, "Initial scan complete");
242 }
243
244 async fn load_project_sessions_static(
246 index_path: &Path,
247 sessions: &Arc<RwLock<HashMap<String, CCSession>>>,
248 file_positions: &Arc<RwLock<HashMap<String, u64>>>,
249 _event_tx: &broadcast::Sender<WatcherEvent>,
250 ) {
251 let index = match parse_sessions_index(index_path).await {
252 Some(i) => i,
253 None => return,
254 };
255
256 for entry in index.entries {
257 let session = entry_to_session(&entry).await;
258 let mut sessions = sessions.write().await;
259
260 if !sessions.contains_key(&session.session_id) {
261 let file_size = get_file_size(Path::new(&session.full_path)).await;
262 file_positions
263 .write()
264 .await
265 .insert(session.full_path.clone(), file_size);
266 }
267
268 sessions.insert(session.session_id.clone(), session);
269 }
270 }
271
272 async fn check_session_for_changes_static(
274 file_path: &Path,
275 sessions: &Arc<RwLock<HashMap<String, CCSession>>>,
276 recent_changes: &Arc<RwLock<Vec<CCTaskChangeEvent>>>,
277 max_recent_changes: usize,
278 event_tx: &broadcast::Sender<WatcherEvent>,
279 _projects_dir: &Path,
280 file_positions: &Arc<RwLock<HashMap<String, u64>>>,
281 ) {
282 let file_path_str = file_path.to_string_lossy().to_string();
283
284 let session_opt = {
286 let sessions = sessions.read().await;
287 sessions
288 .values()
289 .find(|s| s.full_path == file_path_str)
290 .cloned()
291 };
292
293 let mut session = match session_opt {
294 Some(s) => s,
295 None => {
296 if let Some(parent) = file_path.parent() {
298 let index_path = parent.join("sessions-index.json");
299 Self::load_project_sessions_static(
300 &index_path,
301 sessions,
302 file_positions,
303 event_tx,
304 )
305 .await;
306 }
307 return;
308 }
309 };
310
311 let previous_tasks = session.tasks.clone();
313
314 let current_tasks = parse_last_todos(file_path, 100).await;
316
317 let diff = diff_tasks(&previous_tasks, ¤t_tasks);
319
320 if !diff.is_empty() {
321 let was_active = session.is_active;
322 session.tasks = current_tasks.clone();
323 session.modified = Utc::now();
324 session.is_active = true;
325
326 let change_event = CCTaskChangeEvent {
328 session_id: session.session_id.clone(),
329 project_path: session.project_path.clone(),
330 project_name: session.project_name.clone(),
331 previous_tasks,
332 current_tasks,
333 added: diff.added.clone(),
334 removed: diff.removed,
335 status_changed: diff.status_changed.clone(),
336 timestamp: Utc::now(),
337 };
338
339 {
341 let mut changes = recent_changes.write().await;
342 changes.insert(0, change_event.clone());
343 if changes.len() > max_recent_changes {
344 changes.pop();
345 }
346 }
347
348 sessions
350 .write()
351 .await
352 .insert(session.session_id.clone(), session.clone());
353
354 let _ = event_tx.send(WatcherEvent::TasksChanged(change_event));
356
357 for change in &diff.status_changed {
359 if change.task.status == CCTaskStatus::InProgress
360 && change.previous_status == CCTaskStatus::Pending
361 {
362 let _ = event_tx.send(WatcherEvent::TaskStarted {
363 session: session.clone(),
364 task: change.task.clone(),
365 });
366 } else if change.task.status == CCTaskStatus::Completed {
367 let _ = event_tx.send(WatcherEvent::TaskCompleted {
368 session: session.clone(),
369 task: change.task.clone(),
370 });
371 }
372 }
373
374 if !was_active {
376 let _ = event_tx.send(WatcherEvent::SessionActive(session));
377 }
378
379 debug!(
380 added = diff.added.len(),
381 status_changed = diff.status_changed.len(),
382 "Tasks changed"
383 );
384 }
385 }
386
387 async fn check_inactive_sessions_static(
389 sessions: &Arc<RwLock<HashMap<String, CCSession>>>,
390 event_tx: &broadcast::Sender<WatcherEvent>,
391 ) {
392 let now = Utc::now();
393 let five_minutes_ago = now - Duration::minutes(5);
394
395 let mut sessions = sessions.write().await;
396 for session in sessions.values_mut() {
397 if session.is_active && session.modified < five_minutes_ago {
398 session.is_active = false;
399 let _ = event_tx.send(WatcherEvent::SessionInactive(session.clone()));
400 }
401 }
402 }
403
404 pub async fn get_all_sessions(&self) -> Vec<CCSession> {
408 self.sessions.read().await.values().cloned().collect()
409 }
410
411 pub async fn get_active_sessions(&self) -> Vec<CCSession> {
413 self.sessions
414 .read()
415 .await
416 .values()
417 .filter(|s| s.is_active)
418 .cloned()
419 .collect()
420 }
421
422 pub async fn get_sessions_by_project(&self, project_path: &str) -> Vec<CCSession> {
424 self.sessions
425 .read()
426 .await
427 .values()
428 .filter(|s| s.project_path.contains(project_path) || s.project_name.contains(project_path))
429 .cloned()
430 .collect()
431 }
432
433 pub async fn get_session(&self, session_id: &str) -> Option<CCSession> {
435 self.sessions.read().await.get(session_id).cloned()
436 }
437
438 pub async fn get_session_tasks(&self, session_id: &str) -> Option<Vec<CCTask>> {
440 self.sessions
441 .read()
442 .await
443 .get(session_id)
444 .map(|s| s.tasks.clone())
445 }
446
447 pub async fn get_in_progress_tasks(&self) -> Vec<CCInProgressTask> {
449 let sessions = self.sessions.read().await;
450 let mut result = Vec::new();
451
452 for session in sessions.values() {
453 for task in &session.tasks {
454 if task.status == CCTaskStatus::InProgress {
455 result.push(CCInProgressTask {
456 session_id: session.session_id.clone(),
457 project_path: session.project_path.clone(),
458 project_name: session.project_name.clone(),
459 summary: session.summary.clone(),
460 task: task.clone(),
461 modified: session.modified,
462 });
463 }
464 }
465 }
466
467 result.sort_by(|a, b| b.modified.cmp(&a.modified));
469 result
470 }
471
472 pub async fn get_overview(&self) -> CCTasksOverview {
474 let sessions: Vec<_> = self.sessions.read().await.values().cloned().collect();
475 let mut pending = 0;
476 let mut in_progress = 0;
477 let mut completed = 0;
478 let mut sessions_with_tasks = 0;
479
480 for session in &sessions {
481 if !session.tasks.is_empty() {
482 sessions_with_tasks += 1;
483 }
484 for task in &session.tasks {
485 match task.status {
486 CCTaskStatus::Pending => pending += 1,
487 CCTaskStatus::InProgress => in_progress += 1,
488 CCTaskStatus::Completed => completed += 1,
489 }
490 }
491 }
492
493 let recent_changes: Vec<_> = self
494 .recent_changes
495 .read()
496 .await
497 .iter()
498 .take(10)
499 .cloned()
500 .collect();
501
502 CCTasksOverview {
503 total_sessions: sessions.len(),
504 active_sessions: sessions.iter().filter(|s| s.is_active).count(),
505 tasks_by_status: TasksByStatus {
506 pending,
507 in_progress,
508 completed,
509 },
510 sessions_with_tasks,
511 recent_changes,
512 }
513 }
514
515 pub async fn get_recent_changes(&self, limit: usize) -> Vec<CCTaskChangeEvent> {
517 self.recent_changes
518 .read()
519 .await
520 .iter()
521 .take(limit)
522 .cloned()
523 .collect()
524 }
525
526 pub async fn refresh_session(&self, session_id: &str) -> Option<CCSession> {
528 let mut sessions = self.sessions.write().await;
529 let session = sessions.get_mut(session_id)?;
530
531 let tasks = parse_last_todos(Path::new(&session.full_path), 100).await;
532 session.tasks = tasks;
533 session.modified = Utc::now();
534
535 Some(session.clone())
536 }
537
538 pub async fn refresh_all(&self) {
540 if !self.projects_dir.exists() {
542 return;
543 }
544
545 let mut entries = match tokio::fs::read_dir(&self.projects_dir).await {
546 Ok(e) => e,
547 Err(_) => return,
548 };
549
550 while let Ok(Some(entry)) = entries.next_entry().await {
551 let path = entry.path();
552 if !path.is_dir() {
553 continue;
554 }
555
556 let index_path = path.join("sessions-index.json");
557 Self::load_project_sessions_static(
558 &index_path,
559 &self.sessions,
560 &self.file_positions,
561 &self.event_tx,
562 )
563 .await;
564 }
565 }
566}