Skip to main content

chasm/commands/
watch.rs

1// Copyright (c) 2024-2026 Nervosys LLC
2// SPDX-License-Identifier: AGPL-3.0-only
3//! Watch command for file-system monitoring of AI agent session directories
4//!
5//! Monitors agent session storage paths for new or modified files and
6//! automatically harvests them into the chasm database.
7
8use anyhow::{Context, Result};
9use colored::Colorize;
10use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
11use std::collections::HashSet;
12use std::path::PathBuf;
13use std::sync::mpsc;
14use std::time::{Duration, Instant};
15
16use super::run::{resolve_agent, resolve_storage_path, auto_harvest_sessions, AGENTS};
17
18/// Run the watch command — monitor agent session directories for changes
19pub fn watch_cli(
20    agent: Option<&str>,
21    path: Option<&str>,
22    debounce_secs: u64,
23    no_harvest: bool,
24    verbose: bool,
25) -> Result<()> {
26    let home = dirs::home_dir().context("Cannot determine home directory")?;
27
28    // Determine which paths to watch
29    let mut watch_paths: Vec<(String, PathBuf)> = Vec::new();
30
31    if let Some(custom_path) = path {
32        // User specified an explicit path
33        let p = PathBuf::from(custom_path);
34        if !p.exists() {
35            anyhow::bail!("Path does not exist: {}", custom_path);
36        }
37        watch_paths.push(("custom".to_string(), p));
38    } else if let Some(alias) = agent {
39        // Watch a specific agent
40        let config = resolve_agent(alias).ok_or_else(|| {
41            anyhow::anyhow!(
42                "Unknown agent '{}'. Run 'chasm list agents' to see available agents.",
43                alias
44            )
45        })?;
46        if let Some(p) = resolve_storage_path(&home, config) {
47            if p.exists() {
48                watch_paths.push((config.name.to_string(), p));
49            } else {
50                println!(
51                    "{} {} storage path does not exist yet: {}",
52                    "[!]".yellow(),
53                    config.name,
54                    config.storage_hint.dimmed()
55                );
56                println!(
57                    "{} Launch the agent first with 'chasm run {}'",
58                    "[i]".blue(),
59                    alias
60                );
61                anyhow::bail!("No watchable paths found");
62            }
63        }
64    } else {
65        // Watch ALL agent session directories that exist
66        for config in AGENTS {
67            if config.harvestable {
68                if let Some(p) = resolve_storage_path(&home, config) {
69                    if p.exists() {
70                        watch_paths.push((config.name.to_string(), p));
71                    } else if verbose {
72                        println!(
73                            "  {} {} path not found: {}",
74                            "~".dimmed(),
75                            config.name,
76                            config.storage_hint.dimmed()
77                        );
78                    }
79                }
80            }
81        }
82    }
83
84    if watch_paths.is_empty() {
85        println!(
86            "{} No agent session directories found to watch.",
87            "[!]".yellow()
88        );
89        println!(
90            "{} Launch an agent first with 'chasm run <agent>', or specify a path with --path.",
91            "[i]".blue()
92        );
93        return Ok(());
94    }
95
96    // Header
97    println!("{}", "=".repeat(70).cyan());
98    println!(
99        "{} Watching {} path(s) for session changes",
100        "[W]".magenta().bold(),
101        watch_paths.len()
102    );
103    println!("{}", "=".repeat(70).cyan());
104    println!();
105
106    for (name, path) in &watch_paths {
107        println!(
108            "  {} {} → {}",
109            "◉".green(),
110            name.bold(),
111            path.display().to_string().dimmed()
112        );
113    }
114    println!();
115
116    if no_harvest {
117        println!(
118            "{} Dry-run mode — changes detected but not harvested",
119            "[i]".blue()
120        );
121    }
122    println!(
123        "{} Debounce interval: {}s",
124        "[i]".blue(),
125        debounce_secs
126    );
127    println!(
128        "{} Press {} to stop watching",
129        "[i]".blue(),
130        "Ctrl+C".bold()
131    );
132    println!();
133
134    // Set up file watcher
135    let (tx, rx) = mpsc::channel::<notify::Result<Event>>();
136
137    let mut watcher: RecommendedWatcher = Watcher::new(
138        tx,
139        Config::default().with_poll_interval(Duration::from_secs(2)),
140    )
141    .context("Failed to create file system watcher")?;
142
143    // Register all paths
144    for (name, path) in &watch_paths {
145        match watcher.watch(path, RecursiveMode::Recursive) {
146            Ok(()) => {
147                if verbose {
148                    println!(
149                        "  {} Registered watcher for {} ({})",
150                        "+".green(),
151                        name,
152                        path.display()
153                    );
154                }
155            }
156            Err(e) => {
157                println!(
158                    "{} Failed to watch {} ({}): {}",
159                    "[!]".yellow(),
160                    name,
161                    path.display(),
162                    e
163                );
164            }
165        }
166    }
167
168    // Event processing loop with debouncing
169    let debounce = Duration::from_secs(debounce_secs);
170    let mut pending_files: HashSet<PathBuf> = HashSet::new();
171    let mut last_event_time: Option<Instant> = None;
172    let mut total_harvested: usize = 0;
173
174    println!(
175        "{} Watching for changes...",
176        "[*]".blue()
177    );
178    println!();
179
180    loop {
181        match rx.recv_timeout(Duration::from_millis(500)) {
182            Ok(Ok(event)) => {
183                // Filter for create/modify events on files
184                let dominated = matches!(
185                    event.kind,
186                    EventKind::Create(_) | EventKind::Modify(_)
187                );
188
189                if dominated {
190                    for path in event.paths {
191                        if path.is_file() {
192                            // Filter out non-session files (temp files, lock files, etc.)
193                            if should_watch_file(&path) {
194                                if verbose {
195                                    println!(
196                                        "  {} {:?} → {}",
197                                        "△".yellow(),
198                                        event.kind,
199                                        path.display().to_string().dimmed()
200                                    );
201                                }
202                                pending_files.insert(path);
203                                last_event_time = Some(Instant::now());
204                            }
205                        }
206                    }
207                }
208            }
209            Ok(Err(e)) => {
210                if verbose {
211                    println!(
212                        "{} Watcher error: {}",
213                        "[!]".yellow(),
214                        e
215                    );
216                }
217            }
218            Err(mpsc::RecvTimeoutError::Timeout) => {
219                // Check if we have pending files past the debounce window
220            }
221            Err(mpsc::RecvTimeoutError::Disconnected) => {
222                println!(
223                    "{} Watcher disconnected, stopping...",
224                    "[!]".yellow()
225                );
226                break;
227            }
228        }
229
230        // Process pending files if debounce period has elapsed
231        if !pending_files.is_empty() {
232            if let Some(last) = last_event_time {
233                if last.elapsed() >= debounce {
234                    let files: Vec<PathBuf> = pending_files.drain().collect();
235                    let count = files.len();
236                    last_event_time = None;
237
238                    let now = chrono::Local::now().format("%H:%M:%S");
239                    println!(
240                        "{} [{}] Detected {} new/modified file(s):",
241                        "[+]".green().bold(),
242                        now,
243                        count
244                    );
245
246                    for f in &files {
247                        if let Some(name) = f.file_name() {
248                            println!(
249                                "   {} {}",
250                                "+".green(),
251                                name.to_string_lossy().dimmed()
252                            );
253                        }
254                    }
255
256                    if !no_harvest {
257                        match auto_harvest_sessions(&files) {
258                            Ok(n) => {
259                                total_harvested += n;
260                                println!(
261                                    "{} Harvested {} session(s) (total: {})",
262                                    "[+]".green().bold(),
263                                    n,
264                                    total_harvested
265                                );
266                            }
267                            Err(e) => {
268                                println!(
269                                    "{} Harvest failed: {}",
270                                    "[!]".yellow(),
271                                    e
272                                );
273                            }
274                        }
275                    } else {
276                        println!(
277                            "{} Dry-run — skipping harvest",
278                            "[i]".blue()
279                        );
280                    }
281                    println!();
282                }
283            }
284        }
285    }
286
287    Ok(())
288}
289
290/// Determine if a file is a session-related file worth watching
291fn should_watch_file(path: &PathBuf) -> bool {
292    let name = match path.file_name() {
293        Some(n) => n.to_string_lossy().to_lowercase(),
294        None => return false,
295    };
296
297    // Skip temp/lock files
298    if name.starts_with('.')
299        || name.ends_with(".tmp")
300        || name.ends_with(".lock")
301        || name.ends_with(".swp")
302        || name.ends_with(".swo")
303        || name == "desktop.ini"
304        || name == "thumbs.db"
305    {
306        return false;
307    }
308
309    // Accept session-like files
310    name.ends_with(".json")
311        || name.ends_with(".jsonl")
312        || name.ends_with(".md")
313        || name.ends_with(".txt")
314        || name.ends_with(".yaml")
315        || name.ends_with(".yml")
316        || name.ends_with(".log")
317        || name.ends_with(".db")
318        || name.ends_with(".sqlite")
319        // Claude Code uses extensionless JSONL files in some cases
320        || !name.contains('.')
321}