1use crate::error::Result;
2use crate::sync::{RepositorySynchronizer, SyncConfig};
3use git2::Repository;
4use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::mpsc;
9use tokio::sync::Mutex;
10use tokio::time;
11use tracing::{debug, error, info};
12
13#[derive(Debug, Clone)]
15pub struct WatchConfig {
16 pub debounce_ms: u64,
18
19 pub min_interval_ms: u64,
21
22 pub sync_on_start: bool,
24
25 pub dry_run: bool,
27}
28
29impl Default for WatchConfig {
30 fn default() -> Self {
31 Self {
32 debounce_ms: 500,
33 min_interval_ms: 1000,
34 sync_on_start: true,
35 dry_run: false,
36 }
37 }
38}
39
40pub struct WatchManager {
42 repo_path: String,
43 sync_config: SyncConfig,
44 watch_config: WatchConfig,
45 is_syncing: Arc<Mutex<bool>>,
46}
47
48struct FileEventHandler {
50 repo_path: PathBuf,
51 tx: mpsc::Sender<Event>,
52}
53
54impl FileEventHandler {
55 fn new(repo_path: PathBuf, tx: mpsc::Sender<Event>) -> Self {
56 Self { repo_path, tx }
57 }
58
59 fn handle_event(&self, res: std::result::Result<Event, notify::Error>) {
60 let event = match res {
61 Ok(event) => event,
62 Err(e) => {
63 error!("Watch error: {}", e);
64 return;
65 }
66 };
67
68 debug!("Raw file event received: {:?}", event);
69
70 if !self.should_process_event(&event) {
71 return;
72 }
73
74 debug!("Event is relevant, sending to channel");
75 if let Err(e) = self.tx.blocking_send(event.clone()) {
76 error!("Failed to send event to channel: {}", e);
77 } else {
78 debug!("Event sent successfully: {:?}", event.kind);
79 }
80 }
81
82 fn should_process_event(&self, event: &Event) -> bool {
83 if self.is_git_internal(event) {
85 debug!("Ignoring git internal event");
86 return false;
87 }
88
89 let repo = match Repository::open(&self.repo_path) {
91 Ok(r) => r,
92 Err(e) => {
93 error!("Failed to open repository for gitignore check: {}", e);
94 return false;
95 }
96 };
97
98 let should_ignore = event
100 .paths
101 .iter()
102 .any(|path| self.should_ignore_path(&repo, path));
103
104 if should_ignore {
105 debug!("Ignoring gitignored file event");
106 return false;
107 }
108
109 if !self.is_relevant_change(event) {
111 debug!("Event not considered relevant: {:?}", event.kind);
112 return false;
113 }
114
115 true
116 }
117
118 fn is_git_internal(&self, event: &Event) -> bool {
120 event
121 .paths
122 .iter()
123 .any(|path| path.components().any(|c| c.as_os_str() == ".git"))
124 }
125
126 fn should_ignore_path(&self, repo: &Repository, file_path: &Path) -> bool {
128 let relative_path = match file_path.strip_prefix(&self.repo_path) {
130 Ok(p) => p,
131 Err(_) => {
132 debug!("Path {:?} is outside repo, ignoring", file_path);
133 return true;
134 }
135 };
136
137 match repo.status_should_ignore(relative_path) {
139 Ok(ignored) => {
140 if ignored {
141 debug!("Path {:?} is gitignored", relative_path);
142 }
143 ignored
144 }
145 Err(e) => {
146 debug!(
147 "Error checking gitignore status for {:?}: {}",
148 relative_path, e
149 );
150 false
151 }
152 }
153 }
154
155 fn is_relevant_change(&self, event: &Event) -> bool {
157 let is_relevant = matches!(
158 event.kind,
159 EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
160 );
161
162 debug!(
163 "is_relevant_change: kind={:?}, relevant={}",
164 event.kind, is_relevant
165 );
166
167 is_relevant
168 }
169}
170
171impl WatchManager {
172 pub fn new(
174 repo_path: impl AsRef<Path>,
175 sync_config: SyncConfig,
176 watch_config: WatchConfig,
177 ) -> Self {
178 let path_str = repo_path.as_ref().to_string_lossy();
180 let expanded = shellexpand::tilde(&path_str).to_string();
181
182 Self {
183 repo_path: expanded,
184 sync_config,
185 watch_config,
186 is_syncing: Arc::new(Mutex::new(false)),
187 }
188 }
189
190 pub async fn watch(&self) -> Result<()> {
192 info!("Starting watch mode for: {}", self.repo_path);
193
194 if self.watch_config.sync_on_start {
196 info!("Performing initial sync");
197 self.perform_sync().await?;
198 }
199
200 let (tx, rx) = mpsc::channel::<Event>(100);
202
203 let _watcher = self.setup_watcher(tx)?;
205
206 info!(
207 "Watching for changes (debounce: {}s)",
208 self.watch_config.debounce_ms as f64 / 1000.0
209 );
210
211 self.process_events(rx).await
213 }
214
215 fn setup_watcher(&self, tx: mpsc::Sender<Event>) -> Result<RecommendedWatcher> {
217 let repo_path_clone = PathBuf::from(&self.repo_path);
218 let handler = FileEventHandler::new(repo_path_clone, tx);
219
220 let mut watcher =
221 RecommendedWatcher::new(move |res| handler.handle_event(res), Config::default())?;
222
223 watcher.watch(Path::new(&self.repo_path), RecursiveMode::Recursive)?;
225
226 Ok(watcher)
227 }
228
229 async fn process_events(&self, mut rx: mpsc::Receiver<Event>) -> Result<()> {
231 let mut sync_state = SyncState::new(
232 self.watch_config.debounce_ms,
233 self.watch_config.min_interval_ms,
234 );
235
236 loop {
237 let timeout = time::sleep(Duration::from_millis(self.watch_config.debounce_ms));
238 tokio::pin!(timeout);
239
240 tokio::select! {
241 Some(event) = rx.recv() => {
242 self.handle_file_event(event, &mut sync_state);
243 }
244 _ = &mut timeout => {
245 self.handle_timeout(&mut sync_state).await;
246 }
247 }
248 }
249 }
250
251 fn handle_file_event(&self, event: Event, sync_state: &mut SyncState) {
253 debug!("Received event from channel: {:?}", event);
254 debug!("Event kind: {:?}, paths: {:?}", event.kind, event.paths);
255
256 if self.is_relevant_change(&event) {
259 info!("Relevant change detected, marking pending sync");
260 sync_state.mark_pending();
261 } else {
262 debug!("Event not considered relevant: {:?}", event.kind);
263 }
264 }
265
266 fn is_relevant_change(&self, event: &Event) -> bool {
268 let is_relevant = matches!(
269 event.kind,
270 EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
271 );
272
273 debug!(
274 "is_relevant_change: kind={:?}, relevant={}",
275 event.kind, is_relevant
276 );
277
278 is_relevant
279 }
280
281 async fn handle_timeout(&self, sync_state: &mut SyncState) {
283 if !sync_state.should_sync() {
284 return;
285 }
286
287 let is_syncing = self.is_syncing.lock().await;
289 if *is_syncing {
290 debug!("Sync already in progress, skipping");
291 return;
292 }
293 drop(is_syncing); info!("Changes detected, triggering sync");
296 if let Err(e) = self.perform_sync().await {
297 error!("Sync failed: {}", e);
298 }
299
300 sync_state.record_sync();
301 }
302
303 async fn perform_sync(&self) -> Result<()> {
305 {
307 let mut is_syncing = self.is_syncing.lock().await;
308 if *is_syncing {
309 debug!("Sync already in progress");
310 return Ok(());
311 }
312 *is_syncing = true;
313 }
314
315 if self.watch_config.dry_run {
316 info!("DRY RUN: Would perform sync now");
317 {
319 let mut is_syncing = self.is_syncing.lock().await;
320 *is_syncing = false;
321 }
322 return Ok(());
323 }
324
325 let repo_path = self.repo_path.clone();
327 let sync_config = self.sync_config.clone();
328
329 tokio::task::spawn_blocking(move || {
330 let synchronizer =
332 RepositorySynchronizer::new_with_detected_branch(&repo_path, sync_config)?;
333
334 synchronizer.sync(false)
336 })
337 .await??;
338
339 {
341 let mut is_syncing = self.is_syncing.lock().await;
342 *is_syncing = false;
343 }
344
345 Ok(())
346 }
347}
348
349struct SyncState {
351 last_sync: time::Instant,
352 pending_sync: bool,
353 min_interval: Duration,
354}
355
356impl SyncState {
357 fn new(_debounce_ms: u64, min_interval_ms: u64) -> Self {
358 Self {
359 last_sync: time::Instant::now(),
360 pending_sync: false,
361 min_interval: Duration::from_millis(min_interval_ms),
362 }
363 }
364
365 fn mark_pending(&mut self) {
366 self.pending_sync = true;
367 }
368
369 fn should_sync(&self) -> bool {
370 if !self.pending_sync {
371 return false;
372 }
373
374 let elapsed = self.last_sync.elapsed();
375 if elapsed < self.min_interval {
376 debug!("Too soon since last sync, waiting");
377 return false;
378 }
379
380 true
381 }
382
383 fn record_sync(&mut self) {
384 self.last_sync = time::Instant::now();
385 self.pending_sync = false;
386 }
387}
388
389pub async fn watch_with_periodic_sync(
391 repo_path: impl AsRef<Path>,
392 sync_config: SyncConfig,
393 watch_config: WatchConfig,
394 sync_interval_ms: Option<u64>,
395) -> Result<()> {
396 let manager = WatchManager::new(repo_path, sync_config, watch_config);
397
398 if let Some(interval_ms) = sync_interval_ms {
399 info!(
401 "Periodic sync enabled (interval: {}s)",
402 interval_ms as f64 / 1000.0
403 );
404
405 let manager_clone = Arc::new(manager);
406 let manager_watch = manager_clone.clone();
407
408 let watch_handle = tokio::spawn(async move { manager_watch.watch().await });
410
411 let periodic_handle = tokio::spawn(async move {
413 let mut interval = time::interval(Duration::from_millis(interval_ms));
414 interval.tick().await; loop {
417 interval.tick().await;
418 info!("Periodic sync triggered");
419 if let Err(e) = manager_clone.perform_sync().await {
420 error!("Periodic sync failed: {}", e);
421 }
422 }
423 });
424
425 tokio::select! {
427 result = watch_handle => result?,
428 result = periodic_handle => result?,
429 }
430 } else {
431 manager.watch().await
433 }
434}