localgpt_core/memory/
watcher.rs1use anyhow::Result;
4use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::PathBuf;
6use std::sync::mpsc;
7use std::time::Duration;
8use tracing::{debug, info, warn};
9
10use super::MemoryIndex;
11use crate::config::MemoryConfig;
12
13pub struct MemoryWatcher {
14 #[allow(dead_code)]
15 watcher: RecommendedWatcher,
16 #[allow(dead_code)]
17 workspace: PathBuf,
18 #[allow(dead_code)]
19 watched_paths: Vec<PathBuf>,
20}
21
22impl MemoryWatcher {
23 pub fn new(workspace: PathBuf, db_path: PathBuf, config: MemoryConfig) -> Result<Self> {
24 let (tx, rx) = mpsc::channel();
26
27 let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
29 match res {
30 Ok(event) => {
31 match event.kind {
33 EventKind::Modify(_) | EventKind::Create(_) => {
34 for path in event.paths {
35 if path.extension().map(|e| e == "md").unwrap_or(false)
36 && let Err(e) = tx.send(path.clone())
37 {
38 warn!("Failed to send event: {}", e);
39 }
40 }
41 }
42 _ => {}
43 }
44 }
45 Err(e) => warn!("Watch error: {:?}", e),
46 }
47 })?;
48
49 watcher.watch(&workspace, RecursiveMode::Recursive)?;
51 info!("Watching memory files in: {}", workspace.display());
52
53 let mut watched_paths = vec![workspace.clone()];
55 for index_path in &config.paths {
56 let base_path = if index_path.path.starts_with('~') || index_path.path.starts_with('/')
57 {
58 PathBuf::from(shellexpand::tilde(&index_path.path).to_string())
59 } else {
60 workspace.join(&index_path.path)
61 };
62
63 if base_path.starts_with(&workspace) {
65 continue;
66 }
67
68 if base_path.exists() {
69 if let Err(e) = watcher.watch(&base_path, RecursiveMode::Recursive) {
70 warn!("Failed to watch {}: {}", base_path.display(), e);
71 } else {
72 info!("Watching configured path: {}", base_path.display());
73 watched_paths.push(base_path);
74 }
75 } else {
76 debug!("Skipping non-existent path: {}", base_path.display());
77 }
78 }
79
80 let workspace_for_task = workspace.clone();
82 let db_path_for_task = db_path.clone();
83 let chunk_size = config.chunk_size;
84 let chunk_overlap = config.chunk_overlap;
85 std::thread::spawn(move || {
86 let index = match MemoryIndex::new_with_db_path(&workspace_for_task, &db_path_for_task)
87 {
88 Ok(idx) => idx.with_chunk_config(chunk_size, chunk_overlap),
89 Err(e) => {
90 warn!("Failed to create memory index for watcher: {}", e);
91 return;
92 }
93 };
94
95 let debounce_duration = Duration::from_secs(2);
97
98 loop {
99 match rx.recv_timeout(Duration::from_secs(1)) {
100 Ok(path) => {
101 debug!("File changed: {}", path.display());
102
103 let mut last_event_time = std::time::Instant::now();
105 while last_event_time.elapsed() < debounce_duration {
106 match rx.recv_timeout(debounce_duration - last_event_time.elapsed()) {
107 Ok(p) => {
108 debug!("Additional file changed: {}", p.display());
109 last_event_time = std::time::Instant::now();
110 }
111 Err(mpsc::RecvTimeoutError::Timeout) => break,
112 Err(mpsc::RecvTimeoutError::Disconnected) => return,
113 }
114 }
115
116 if let Err(e) = index.index_file(&path, false) {
118 warn!("Failed to reindex file {}: {}", path.display(), e);
119 } else {
120 info!("Reindexed: {}", path.display());
121 }
122 }
123 Err(mpsc::RecvTimeoutError::Timeout) => continue,
124 Err(mpsc::RecvTimeoutError::Disconnected) => {
125 info!("Watcher channel disconnected");
126 return;
127 }
128 }
129 }
130 });
131
132 Ok(Self {
133 watcher,
134 workspace,
135 watched_paths,
136 })
137 }
138}