1use crate::core::error::{Error, Result};
11use crate::index::{IndexWriter, TantivyIndex};
12use crate::parse::chunk_file;
13use notify::{
14 event::{CreateKind, ModifyKind, RemoveKind},
15 Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
16};
17use std::collections::{HashMap, HashSet};
18use std::path::{Path, PathBuf};
19use std::sync::mpsc as std_mpsc;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23const DEBOUNCE_MS: u64 = 500;
25
26#[derive(Debug, Clone)]
28pub enum FileEvent {
29 Changed(PathBuf),
31 Deleted(PathBuf),
33}
34
35pub struct WatcherManager {
37 watchers: HashMap<PathBuf, ProjectWatcher>,
39 event_tx: std_mpsc::Sender<(PathBuf, FileEvent)>,
41 event_rx: std_mpsc::Receiver<(PathBuf, FileEvent)>,
42}
43
44impl WatcherManager {
45 pub fn new() -> Self {
46 let (event_tx, event_rx) = std_mpsc::channel();
47 Self {
48 watchers: HashMap::new(),
49 event_tx,
50 event_rx,
51 }
52 }
53
54 pub fn watch(&mut self, project_path: PathBuf) -> Result<()> {
56 if self.watchers.contains_key(&project_path) {
57 debug!(project = %project_path.display(), "Already watching");
58 return Ok(());
59 }
60
61 let watcher = ProjectWatcher::new(project_path.clone(), self.event_tx.clone())?;
62 self.watchers.insert(project_path.clone(), watcher);
63 info!(project = %project_path.display(), "Started watching");
64 Ok(())
65 }
66
67 pub fn unwatch(&mut self, project_path: &Path) {
69 if self.watchers.remove(project_path).is_some() {
70 info!(project = %project_path.display(), "Stopped watching");
71 }
72 }
73
74 pub fn process_events_sync(&mut self) -> Vec<PathBuf> {
77 let mut pending: HashMap<PathBuf, Vec<FileEvent>> = HashMap::new();
78 let debounce = Duration::from_millis(DEBOUNCE_MS);
79
80 loop {
82 match self.event_rx.recv_timeout(debounce) {
83 Ok((project, event)) => {
84 pending.entry(project).or_default().push(event);
85 }
86 Err(std_mpsc::RecvTimeoutError::Timeout) => {
87 if !pending.is_empty() {
89 break;
90 }
91 return Vec::new();
93 }
94 Err(std_mpsc::RecvTimeoutError::Disconnected) => {
95 break;
96 }
97 }
98 }
99
100 let mut updated = Vec::new();
102 for (project_path, events) in pending {
103 if let Err(e) = process_project_events_sync(&project_path, events) {
104 warn!(project = %project_path.display(), error = %e, "Failed to process events");
105 } else {
106 updated.push(project_path);
107 }
108 }
109
110 updated
111 }
112}
113
114impl Default for WatcherManager {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120struct ProjectWatcher {
122 #[allow(dead_code)]
123 watcher: RecommendedWatcher,
124 #[allow(dead_code)]
125 project_path: PathBuf,
126}
127
128impl ProjectWatcher {
129 fn new(
130 project_path: PathBuf,
131 event_tx: std_mpsc::Sender<(PathBuf, FileEvent)>,
132 ) -> Result<Self> {
133 let project_path_clone = project_path.clone();
134
135 let mut watcher = notify::recommended_watcher(move |res: notify::Result<Event>| {
136 if let Ok(event) = res {
137 if let Some(file_event) = classify_event(&event) {
138 let _ = event_tx.send((project_path_clone.clone(), file_event));
140 }
141 }
142 })
143 .map_err(|e| Error::WatchError {
144 message: e.to_string(),
145 })?;
146
147 watcher
148 .watch(&project_path, RecursiveMode::Recursive)
149 .map_err(|e| Error::WatchError {
150 message: e.to_string(),
151 })?;
152
153 Ok(Self {
154 watcher,
155 project_path,
156 })
157 }
158}
159
160fn classify_event(event: &Event) -> Option<FileEvent> {
162 let paths: Vec<_> = event
164 .paths
165 .iter()
166 .filter(|p| p.is_file() || !p.exists()) .filter(|p| is_indexable_file(p))
168 .cloned()
169 .collect();
170
171 if paths.is_empty() {
172 return None;
173 }
174
175 let path = paths.into_iter().next()?;
176
177 match &event.kind {
178 EventKind::Create(CreateKind::File) => Some(FileEvent::Changed(path)),
179 EventKind::Modify(ModifyKind::Data(_)) => Some(FileEvent::Changed(path)),
180 EventKind::Modify(ModifyKind::Name(_)) => Some(FileEvent::Changed(path)),
181 EventKind::Remove(RemoveKind::File) => Some(FileEvent::Deleted(path)),
182 _ => None,
183 }
184}
185
186fn is_indexable_file(path: &Path) -> bool {
188 if path
190 .components()
191 .any(|c| c.as_os_str().to_string_lossy().starts_with('.'))
192 {
193 return false;
194 }
195
196 let ext = path
198 .extension()
199 .and_then(|e| e.to_str())
200 .unwrap_or("")
201 .to_lowercase();
202
203 matches!(
204 ext.as_str(),
205 "ts" | "tsx"
206 | "js"
207 | "jsx"
208 | "mjs"
209 | "cjs"
210 | "py"
211 | "pyi"
212 | "rs"
213 | "go"
214 | "java"
215 | "kt"
216 | "kts"
217 | "scala"
218 | "rb"
219 | "php"
220 | "c"
221 | "h"
222 | "cpp"
223 | "cc"
224 | "cxx"
225 | "hpp"
226 | "cs"
227 | "swift"
228 | "ex"
229 | "exs"
230 | "erl"
231 | "hrl"
232 | "hs"
233 | "ml"
234 | "mli"
235 | "lua"
236 | "sh"
237 | "bash"
238 | "zsh"
239 | "sql"
240 | "vue"
241 | "svelte"
242 )
243}
244
245fn process_project_events_sync(project_path: &Path, events: Vec<FileEvent>) -> Result<()> {
247 let mut to_reindex: HashSet<PathBuf> = HashSet::new();
249 let mut to_delete: HashSet<PathBuf> = HashSet::new();
250
251 for event in events {
252 match event {
253 FileEvent::Changed(path) => {
254 to_delete.remove(&path); to_reindex.insert(path);
256 }
257 FileEvent::Deleted(path) => {
258 to_reindex.remove(&path); to_delete.insert(path);
260 }
261 }
262 }
263
264 if to_reindex.is_empty() && to_delete.is_empty() {
265 return Ok(());
266 }
267
268 info!(
269 project = %project_path.display(),
270 reindex = to_reindex.len(),
271 delete = to_delete.len(),
272 "Processing file changes"
273 );
274
275 let index = TantivyIndex::open_or_create(project_path)?;
277 let mut writer = IndexWriter::new(&index)?;
278
279 let all_paths: Vec<_> = to_reindex.iter().chain(to_delete.iter()).collect();
281 for path in &all_paths {
282 let path_str = path.to_string_lossy();
283 writer.delete_by_path(&path_str)?;
284 }
285
286 for path in to_reindex {
288 if let Ok(content) = std::fs::read_to_string(&path) {
289 let chunks = chunk_file(&path, &content);
290 for chunk in &chunks {
291 writer.add_chunk(chunk)?;
292 }
293 debug!(path = %path.display(), chunks = chunks.len(), "Re-indexed file");
294 }
295 }
296
297 writer.commit()?;
299
300 info!(project = %project_path.display(), "Incremental index update complete");
301 Ok(())
302}
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307
308 #[test]
309 fn test_is_indexable_file() {
310 assert!(is_indexable_file(Path::new("src/main.rs")));
311 assert!(is_indexable_file(Path::new("app.tsx")));
312 assert!(!is_indexable_file(Path::new(".git/config")));
313 assert!(!is_indexable_file(Path::new("image.png")));
314 }
315}