Skip to main content

localgpt_core/memory/
watcher.rs

1//! File system watcher for automatic memory reindexing
2
3use anyhow::Result;
4use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::PathBuf;
6use std::sync::mpsc;
7use std::time::Duration;
8use tracing::{debug, info, warn};
9
10use super::MemoryIndex;
11use crate::config::MemoryConfig;
12
13pub struct MemoryWatcher {
14    #[allow(dead_code)]
15    watcher: RecommendedWatcher,
16    #[allow(dead_code)]
17    workspace: PathBuf,
18    #[allow(dead_code)]
19    watched_paths: Vec<PathBuf>,
20}
21
22impl MemoryWatcher {
23    pub fn new(workspace: PathBuf, db_path: PathBuf, config: MemoryConfig) -> Result<Self> {
24        // Create a channel for receiving events
25        let (tx, rx) = mpsc::channel();
26
27        // Create watcher with debounce
28        let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
29            match res {
30                Ok(event) => {
31                    // Filter for modify/create events on .md files
32                    match event.kind {
33                        EventKind::Modify(_) | EventKind::Create(_) => {
34                            for path in event.paths {
35                                if path.extension().map(|e| e == "md").unwrap_or(false)
36                                    && let Err(e) = tx.send(path.clone())
37                                {
38                                    warn!("Failed to send event: {}", e);
39                                }
40                            }
41                        }
42                        _ => {}
43                    }
44                }
45                Err(e) => warn!("Watch error: {:?}", e),
46            }
47        })?;
48
49        // Watch the workspace directory
50        watcher.watch(&workspace, RecursiveMode::Recursive)?;
51        info!("Watching memory files in: {}", workspace.display());
52
53        // Watch configured paths
54        let mut watched_paths = vec![workspace.clone()];
55        for index_path in &config.paths {
56            let base_path = if index_path.path.starts_with('~') || index_path.path.starts_with('/')
57            {
58                PathBuf::from(shellexpand::tilde(&index_path.path).to_string())
59            } else {
60                workspace.join(&index_path.path)
61            };
62
63            // Skip if already watching (subdirectory of workspace)
64            if base_path.starts_with(&workspace) {
65                continue;
66            }
67
68            if base_path.exists() {
69                if let Err(e) = watcher.watch(&base_path, RecursiveMode::Recursive) {
70                    warn!("Failed to watch {}: {}", base_path.display(), e);
71                } else {
72                    info!("Watching configured path: {}", base_path.display());
73                    watched_paths.push(base_path);
74                }
75            } else {
76                debug!("Skipping non-existent path: {}", base_path.display());
77            }
78        }
79
80        // Spawn background task to handle events
81        let workspace_for_task = workspace.clone();
82        let db_path_for_task = db_path.clone();
83        let chunk_size = config.chunk_size;
84        let chunk_overlap = config.chunk_overlap;
85        std::thread::spawn(move || {
86            let index = match MemoryIndex::new_with_db_path(&workspace_for_task, &db_path_for_task)
87            {
88                Ok(idx) => idx.with_chunk_config(chunk_size, chunk_overlap),
89                Err(e) => {
90                    warn!("Failed to create memory index for watcher: {}", e);
91                    return;
92                }
93            };
94
95            // Debounce events
96            let debounce_duration = Duration::from_secs(2);
97
98            loop {
99                match rx.recv_timeout(Duration::from_secs(1)) {
100                    Ok(path) => {
101                        debug!("File changed: {}", path.display());
102
103                        // Debounce: wait for events to settle
104                        let mut last_event_time = std::time::Instant::now();
105                        while last_event_time.elapsed() < debounce_duration {
106                            match rx.recv_timeout(debounce_duration - last_event_time.elapsed()) {
107                                Ok(p) => {
108                                    debug!("Additional file changed: {}", p.display());
109                                    last_event_time = std::time::Instant::now();
110                                }
111                                Err(mpsc::RecvTimeoutError::Timeout) => break,
112                                Err(mpsc::RecvTimeoutError::Disconnected) => return,
113                            }
114                        }
115
116                        // Reindex the file
117                        if let Err(e) = index.index_file(&path, false) {
118                            warn!("Failed to reindex file {}: {}", path.display(), e);
119                        } else {
120                            info!("Reindexed: {}", path.display());
121                        }
122                    }
123                    Err(mpsc::RecvTimeoutError::Timeout) => continue,
124                    Err(mpsc::RecvTimeoutError::Disconnected) => {
125                        info!("Watcher channel disconnected");
126                        return;
127                    }
128                }
129            }
130        });
131
132        Ok(Self {
133            watcher,
134            workspace,
135            watched_paths,
136        })
137    }
138}