Skip to main content

st/mcp/
unified_watcher.rs

1// Unified Watcher - Master control for all context absorption and searching
2// "The all-seeing eye of Smart Tree!" - Aye
3
4use anyhow::Result;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::path::PathBuf;
8use std::sync::{Arc, Mutex};
9use std::thread;
10use std::time::Duration;
11use tokio::sync::Mutex as TokioMutex;
12
13use super::context_absorber::ContextAbsorber;
14use super::smart_background_searcher::{SearchConfig, SmartBackgroundSearcher};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct UnifiedWatcherConfig {
18    pub project_name: String,
19    pub watch_paths: Vec<String>,
20    pub enable_absorption: bool,
21    pub enable_search: bool,
22    pub enable_logging: bool,
23    pub auto_start: bool,
24}
25
26impl Default for UnifiedWatcherConfig {
27    fn default() -> Self {
28        Self {
29            project_name: "smart-tree".to_string(),
30            watch_paths: vec![
31                "~/Documents/".to_string(),
32                "~/.config/".to_string(),
33                "~/Library/Application Support/Claude/".to_string(),
34                "~/.cursor/".to_string(),
35                "~/.vscode/".to_string(),
36            ],
37            enable_absorption: true,
38            enable_search: true,
39            enable_logging: true,
40            auto_start: false,
41        }
42    }
43}
44
45pub struct UnifiedWatcher {
46    config: UnifiedWatcherConfig,
47    absorber: Option<Arc<Mutex<ContextAbsorber>>>,
48    searcher: Option<Arc<TokioMutex<SmartBackgroundSearcher>>>,
49    status: Arc<Mutex<WatcherStatus>>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct WatcherStatus {
54    pub is_running: bool,
55    pub files_watched: usize,
56    pub contexts_absorbed: usize,
57    pub search_results_cached: usize,
58    pub last_activity: Option<String>,
59    pub watched_directories: Vec<String>,
60}
61
62impl UnifiedWatcher {
63    pub fn new(config: UnifiedWatcherConfig) -> Result<Self> {
64        let status = Arc::new(Mutex::new(WatcherStatus {
65            is_running: false,
66            files_watched: 0,
67            contexts_absorbed: 0,
68            search_results_cached: 0,
69            last_activity: None,
70            watched_directories: config.watch_paths.clone(),
71        }));
72
73        Ok(Self {
74            config,
75            absorber: None,
76            searcher: None,
77            status,
78        })
79    }
80
81    pub async fn start(&mut self) -> Result<()> {
82        println!(
83            "šŸš€ Starting Unified Watcher for project: {}",
84            self.config.project_name
85        );
86
87        // Initialize activity logging if enabled
88        if self.config.enable_logging {
89            crate::activity_logger::ActivityLogger::init(Some("~/.st/watcher.jsonl".to_string()))?;
90            crate::activity_logger::ActivityLogger::log_event(
91                "watcher",
92                "start",
93                serde_json::json!({
94                    "project": self.config.project_name,
95                    "watch_paths": self.config.watch_paths,
96                }),
97            )?;
98        }
99
100        // Expand watch paths
101        let watch_paths: Vec<PathBuf> = self
102            .config
103            .watch_paths
104            .iter()
105            .map(|p| PathBuf::from(shellexpand::tilde(p).to_string()))
106            .filter(|p| p.exists())
107            .collect();
108
109        // Start context absorber if enabled
110        if self.config.enable_absorption {
111            println!("🧽 Starting Context Absorber...");
112            let mut absorber = ContextAbsorber::new(self.config.project_name.clone())?;
113            absorber.start_watching()?;
114            self.absorber = Some(Arc::new(Mutex::new(absorber)));
115            println!("   āœ… Context Absorber active");
116        }
117
118        // Start smart searcher if enabled
119        if self.config.enable_search {
120            println!("šŸ” Starting Smart Background Searcher...");
121            let search_config = SearchConfig {
122                max_lines_per_file: 1000, // Limit for JSONL files
123                smart_sampling: true,
124                ..Default::default()
125            };
126            let mut searcher = SmartBackgroundSearcher::new(search_config)?;
127            searcher.start_watching(watch_paths.clone())?;
128            self.searcher = Some(Arc::new(TokioMutex::new(searcher)));
129            println!("   āœ… Smart Searcher active");
130        }
131
132        // Update status
133        if let Ok(mut status) = self.status.lock() {
134            status.is_running = true;
135            status.watched_directories = watch_paths
136                .iter()
137                .map(|p| p.to_string_lossy().to_string())
138                .collect();
139            status.last_activity = Some(format!("Started watching at {}", chrono::Utc::now()));
140        }
141
142        // Start monitoring thread
143        self.start_monitor_thread();
144
145        println!("\n✨ Unified Watcher is now active!");
146        println!("šŸ“‚ Watching {} directories", watch_paths.len());
147        println!("šŸŽÆ Project: {}", self.config.project_name);
148
149        Ok(())
150    }
151
152    fn start_monitor_thread(&self) {
153        let status = self.status.clone();
154        let absorber = self.absorber.clone();
155        let _searcher = self.searcher.clone();
156
157        thread::spawn(move || {
158            loop {
159                thread::sleep(Duration::from_secs(30));
160
161                // Update status periodically
162                if let Ok(mut stat) = status.lock() {
163                    // Get absorbed context count
164                    if let Some(abs) = &absorber {
165                        if let Ok(abs_lock) = abs.lock() {
166                            stat.contexts_absorbed = abs_lock.get_absorbed_contexts().len();
167                        }
168                    }
169
170                    // Update last activity
171                    stat.last_activity = Some(format!("Active at {}", chrono::Utc::now()));
172                }
173            }
174        });
175    }
176
177    pub async fn stop(&mut self) -> Result<()> {
178        println!("šŸ›‘ Stopping Unified Watcher...");
179
180        // Stop absorber
181        if let Some(abs) = &self.absorber {
182            if let Ok(mut abs_lock) = abs.lock() {
183                abs_lock.stop_watching();
184            }
185        }
186
187        // Clear searcher cache
188        if let Some(search) = &self.searcher {
189            let search_lock = search.lock().await;
190            search_lock.clear_cache();
191        }
192
193        // Update status
194        if let Ok(mut status) = self.status.lock() {
195            status.is_running = false;
196            status.last_activity = Some(format!("Stopped at {}", chrono::Utc::now()));
197        }
198
199        // Log shutdown
200        if self.config.enable_logging {
201            crate::activity_logger::ActivityLogger::log_event(
202                "watcher",
203                "stop",
204                serde_json::json!({
205                    "project": self.config.project_name,
206                }),
207            )?;
208        }
209
210        Ok(())
211    }
212
213    pub async fn search(&self, query: &str) -> Result<Vec<Value>> {
214        if let Some(searcher) = &self.searcher {
215            let search_lock = searcher.lock().await;
216            let paths: Vec<PathBuf> = self
217                .config
218                .watch_paths
219                .iter()
220                .map(|p| PathBuf::from(shellexpand::tilde(p).to_string()))
221                .collect();
222
223            let results = search_lock.search(query, paths).await;
224
225            // Convert to JSON for MCP
226            let json_results: Vec<Value> = results
227                .into_iter()
228                .map(|r| {
229                    serde_json::json!({
230                        "file": r.file_path.to_string_lossy(),
231                        "line": r.line_number,
232                        "content": r.content,
233                        "score": r.score,
234                        "type": r.file_type,
235                    })
236                })
237                .collect();
238
239            return Ok(json_results);
240        }
241        Ok(Vec::new())
242    }
243
244    pub fn get_status(&self) -> WatcherStatus {
245        self.status.lock().unwrap().clone()
246    }
247}
248
249// MCP Tool Handler
250pub async fn handle_unified_watcher(
251    params: Value,
252    _ctx: Arc<crate::mcp::McpContext>,
253) -> Result<Value> {
254    let action = params["action"].as_str().unwrap_or("status");
255
256    // Use a static instance for the watcher (TokioMutex for async-safe access)
257    static WATCHER: Lazy<Arc<TokioMutex<Option<UnifiedWatcher>>>> =
258        Lazy::new(|| Arc::new(TokioMutex::new(None)));
259
260    match action {
261        "start" => {
262            let project = params["project"]
263                .as_str()
264                .map(|s| s.to_string())
265                .unwrap_or_else(|| {
266                    // Try to detect project from current directory
267                    std::env::current_dir()
268                        .ok()
269                        .and_then(|p| p.file_name().map(|n| n.to_os_string()))
270                        .and_then(|n| n.to_str().map(|s| s.to_string()))
271                        .unwrap_or_else(|| "unknown".to_string())
272                });
273
274            let watch_paths = params["paths"]
275                .as_array()
276                .map(|arr| {
277                    arr.iter()
278                        .filter_map(|v| v.as_str())
279                        .map(|s| s.to_string())
280                        .collect()
281                })
282                .unwrap_or_else(|| UnifiedWatcherConfig::default().watch_paths);
283
284            let config = UnifiedWatcherConfig {
285                project_name: project.to_string(),
286                watch_paths,
287                enable_absorption: params["enable_absorption"].as_bool().unwrap_or(true),
288                enable_search: params["enable_search"].as_bool().unwrap_or(true),
289                enable_logging: params["enable_logging"].as_bool().unwrap_or(true),
290                auto_start: false,
291            };
292
293            let mut watcher = UnifiedWatcher::new(config)?;
294            watcher.start().await?;
295
296            let status = watcher.get_status();
297
298            // Store the watcher
299            *WATCHER.lock().await = Some(watcher);
300
301            Ok(serde_json::json!({
302                "status": "started",
303                "project": project,
304                "watching": status.watched_directories,
305                "features": {
306                    "absorption": params["enable_absorption"].as_bool().unwrap_or(true),
307                    "search": params["enable_search"].as_bool().unwrap_or(true),
308                    "logging": params["enable_logging"].as_bool().unwrap_or(true),
309                },
310                "message": format!("šŸš€ Unified Watcher active for '{}'", project)
311            }))
312        }
313
314        "stop" => {
315            if let Some(mut watcher) = WATCHER.lock().await.take() {
316                watcher.stop().await?;
317                Ok(serde_json::json!({
318                    "status": "stopped",
319                    "message": "Watcher stopped successfully"
320                }))
321            } else {
322                Ok(serde_json::json!({
323                    "status": "not_running",
324                    "message": "No watcher is currently running"
325                }))
326            }
327        }
328
329        "search" => {
330            let query = params["query"]
331                .as_str()
332                .ok_or_else(|| anyhow::anyhow!("Missing query parameter"))?;
333
334            let guard = WATCHER.lock().await;
335            if let Some(watcher) = guard.as_ref() {
336                let results = watcher.search(query).await?;
337                Ok(serde_json::json!({
338                    "query": query,
339                    "results": results,
340                    "count": results.len(),
341                }))
342            } else {
343                Ok(serde_json::json!({
344                    "error": "Watcher not running",
345                    "message": "Start the watcher first with action: 'start'"
346                }))
347            }
348        }
349
350        "status" => {
351            let guard = WATCHER.lock().await;
352            if let Some(watcher) = guard.as_ref() {
353                let status = watcher.get_status();
354                Ok(serde_json::json!({
355                    "running": status.is_running,
356                    "files_watched": status.files_watched,
357                    "contexts_absorbed": status.contexts_absorbed,
358                    "search_results_cached": status.search_results_cached,
359                    "last_activity": status.last_activity,
360                    "watched_directories": status.watched_directories,
361                }))
362            } else {
363                Ok(serde_json::json!({
364                    "running": false,
365                    "message": "No watcher configured"
366                }))
367            }
368        }
369
370        _ => Err(anyhow::anyhow!(
371            "Unknown action: {}. Valid actions: start, stop, search, status",
372            action
373        )),
374    }
375}
376
377use once_cell::sync::Lazy;