lore_cli/daemon/
watcher.rs1use anyhow::{Context, Result};
11use notify::RecursiveMode;
12use notify_debouncer_mini::{new_debouncer, DebouncedEvent};
13use std::collections::HashMap;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{mpsc, RwLock};
18
19use crate::capture::watchers::default_registry;
20use crate::storage::Database;
21
22use super::state::DaemonStats;
23
24#[derive(Clone)]
28pub struct DbConfig {
29 path: PathBuf,
30}
31
32impl DbConfig {
33 pub fn default_config() -> Result<Self> {
35 let path = crate::storage::db::default_db_path()?;
36 Ok(Self { path })
37 }
38
39 pub fn open(&self) -> Result<Database> {
41 Database::open(&self.path)
42 }
43}
44
45pub struct SessionWatcher {
50 file_positions: HashMap<PathBuf, u64>,
52 watch_dirs: Vec<PathBuf>,
54 db_config: DbConfig,
56}
57
58impl SessionWatcher {
59 pub fn new() -> Result<Self> {
68 let registry = default_registry();
69 let watch_dirs = registry.all_watch_paths();
70
71 let db_config = DbConfig::default_config()?;
72
73 Ok(Self {
74 file_positions: HashMap::new(),
75 watch_dirs,
76 db_config,
77 })
78 }
79
80 #[allow(dead_code)]
85 pub fn watch_dirs(&self) -> &[PathBuf] {
86 &self.watch_dirs
87 }
88
89 pub async fn watch(
102 &mut self,
103 stats: Arc<RwLock<DaemonStats>>,
104 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
105 ) -> Result<()> {
106 for dir in &self.watch_dirs {
108 if dir.exists() {
109 tracing::info!("Will watch for session files in {:?}", dir);
110 } else {
111 tracing::info!("Watch directory does not exist yet: {:?}", dir);
112 }
113 }
114
115 let (tx, mut rx) = mpsc::channel::<Vec<DebouncedEvent>>(100);
117
118 let mut debouncer = new_debouncer(
120 Duration::from_millis(500),
121 move |events: Result<Vec<DebouncedEvent>, notify::Error>| {
122 if let Ok(events) = events {
123 let filtered: Vec<DebouncedEvent> = events
125 .into_iter()
126 .filter(|e| {
127 let ext = e.path.extension().and_then(|ext| ext.to_str());
128 matches!(ext, Some("jsonl") | Some("vscdb"))
129 })
130 .collect();
131
132 if !filtered.is_empty() {
133 let _ = tx.blocking_send(filtered);
134 }
135 }
136 },
137 )
138 .context("Failed to create file watcher")?;
139
140 for dir in &self.watch_dirs {
142 if dir.exists() {
143 debouncer
144 .watcher()
145 .watch(dir, RecursiveMode::Recursive)
146 .context(format!("Failed to start watching directory {dir:?}"))?;
147
148 tracing::info!("Watching for session files in {:?}", dir);
149 }
150 }
151
152 self.initial_scan(&stats).await?;
154
155 loop {
157 tokio::select! {
158 Some(events) = rx.recv() => {
159 for event in events {
160 if let Err(e) = self.handle_file_event(&event.path, &stats).await {
161 tracing::warn!("Error handling file event for {:?}: {}", event.path, e);
162 let mut stats_guard = stats.write().await;
163 stats_guard.errors += 1;
164 }
165 }
166 }
167 _ = shutdown_rx.recv() => {
168 tracing::info!("Session watcher shutting down");
169 break;
170 }
171 }
172 }
173
174 Ok(())
175 }
176
177 fn open_db(&self) -> Result<Database> {
179 self.db_config.open()
180 }
181
182 async fn initial_scan(&mut self, stats: &Arc<RwLock<DaemonStats>>) -> Result<()> {
188 tracing::info!("Performing initial scan of session files...");
189
190 let registry = default_registry();
191 let mut total_files = 0;
192
193 for watcher in registry.available_watchers() {
194 let watcher_name = watcher.info().name;
195 match watcher.find_sources() {
196 Ok(sources) => {
197 tracing::info!("Found {} sources for {}", sources.len(), watcher_name);
198 total_files += sources.len();
199
200 for path in sources {
201 match self.process_file_sync(&path) {
203 Ok(Some((sessions_imported, messages_imported))) => {
204 let mut stats_guard = stats.write().await;
205 stats_guard.sessions_imported += sessions_imported;
206 stats_guard.messages_imported += messages_imported;
207 stats_guard.files_watched = self.file_positions.len();
208 }
209 Ok(None) => {
210 }
212 Err(e) => {
213 tracing::warn!("Failed to import {:?}: {}", path, e);
214 let mut stats_guard = stats.write().await;
215 stats_guard.errors += 1;
216 }
217 }
218 }
219 }
220 Err(e) => {
221 tracing::warn!("Failed to find sources for {}: {}", watcher_name, e);
222 }
223 }
224 }
225
226 {
227 let mut stats_guard = stats.write().await;
228 stats_guard.files_watched = total_files;
229 }
230
231 Ok(())
232 }
233
234 async fn handle_file_event(
236 &mut self,
237 path: &Path,
238 stats: &Arc<RwLock<DaemonStats>>,
239 ) -> Result<()> {
240 let ext = path.extension().and_then(|e| e.to_str());
241
242 if !matches!(ext, Some("jsonl") | Some("vscdb")) {
244 return Ok(());
245 }
246
247 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
249 if name.starts_with("agent-") {
250 return Ok(());
251 }
252 }
253
254 if !path.exists() {
256 self.file_positions.remove(path);
258 return Ok(());
259 }
260
261 match self.process_file_sync(path) {
263 Ok(Some((sessions_imported, messages_imported))) => {
264 let mut stats_guard = stats.write().await;
265 stats_guard.sessions_imported += sessions_imported;
266 stats_guard.messages_imported += messages_imported;
267 stats_guard.files_watched = self.file_positions.len();
268 }
269 Ok(None) => {
270 }
272 Err(e) => {
273 return Err(e);
274 }
275 }
276
277 Ok(())
278 }
279
280 fn process_file_sync(&mut self, path: &Path) -> Result<Option<(u64, u64)>> {
285 let db = self.open_db()?;
286 let path_str = path.to_string_lossy();
287 let last_pos = self.file_positions.get(path).copied().unwrap_or(0);
288
289 let metadata = std::fs::metadata(path).context("Failed to get file metadata")?;
291 let current_size = metadata.len();
292
293 if current_size <= last_pos {
294 if current_size < last_pos {
296 self.file_positions.insert(path.to_path_buf(), 0);
298 }
299 return Ok(None);
300 }
301
302 if db.session_exists_by_source(&path_str)? {
304 self.file_positions.insert(path.to_path_buf(), current_size);
306 return Ok(None);
307 }
308
309 let result = self.import_file_sync(path, &db)?;
311
312 self.file_positions.insert(path.to_path_buf(), current_size);
314
315 Ok(Some(result))
316 }
317
318 fn import_file_sync(&mut self, path: &Path, db: &Database) -> Result<(u64, u64)> {
323 tracing::debug!("Importing session file: {:?}", path);
324
325 let path_buf = path.to_path_buf();
326 let registry = default_registry();
327
328 let mut parsed_sessions = Vec::new();
330
331 for watcher in registry.available_watchers() {
332 match watcher.parse_source(&path_buf) {
333 Ok(sessions) if !sessions.is_empty() => {
334 parsed_sessions = sessions;
335 break;
336 }
337 Ok(_) => continue,
338 Err(e) => {
339 tracing::debug!(
340 "Watcher {} could not parse {:?}: {}",
341 watcher.info().name,
342 path,
343 e
344 );
345 }
346 }
347 }
348
349 if parsed_sessions.is_empty() {
350 tracing::debug!("No watcher could parse {:?}", path);
351 return Ok((0, 0));
352 }
353
354 let mut total_sessions = 0u64;
355 let mut total_messages = 0u64;
356
357 for (session, messages) in parsed_sessions {
358 if messages.is_empty() {
359 continue;
360 }
361
362 let message_count = messages.len();
363
364 db.insert_session(&session)?;
366
367 let mut latest_branch: Option<String> = None;
369 for msg in &messages {
370 db.insert_message(msg)?;
371 if msg.git_branch.is_some() {
373 latest_branch = msg.git_branch.clone();
374 }
375 }
376
377 if let Some(ref new_branch) = latest_branch {
380 if session.git_branch.as_ref() != Some(new_branch) {
381 if let Err(e) = db.update_session_branch(session.id, new_branch) {
382 tracing::warn!(
383 "Failed to update session branch for {}: {}",
384 &session.id.to_string()[..8],
385 e
386 );
387 } else {
388 tracing::debug!(
389 "Updated session {} branch to {}",
390 &session.id.to_string()[..8],
391 new_branch
392 );
393 }
394 }
395 }
396
397 tracing::info!(
398 "Imported session {} with {} messages from {:?}",
399 &session.id.to_string()[..8],
400 message_count,
401 path.file_name().unwrap_or_default()
402 );
403
404 total_sessions += 1;
405 total_messages += message_count as u64;
406 }
407
408 if let Ok(metadata) = std::fs::metadata(path) {
410 self.file_positions
411 .insert(path.to_path_buf(), metadata.len());
412 }
413
414 Ok((total_sessions, total_messages))
415 }
416
417 #[allow(dead_code)]
422 pub fn tracked_file_count(&self) -> usize {
423 self.file_positions.len()
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430
431 #[test]
432 fn test_session_watcher_creation() {
433 let watcher = SessionWatcher::new();
434 assert!(watcher.is_ok(), "Should create watcher successfully");
435
436 let _watcher = watcher.unwrap();
440 }
441
442 #[test]
443 fn test_watch_dirs_from_registry() {
444 use crate::capture::watchers::default_registry;
445
446 let registry = default_registry();
454 let all_watchers = registry.all_watchers();
455
456 let all_paths: Vec<_> = all_watchers.iter().flat_map(|w| w.watch_paths()).collect();
458
459 let has_claude = all_paths
460 .iter()
461 .any(|d| d.to_string_lossy().contains(".claude"));
462 let has_cursor = all_paths
463 .iter()
464 .any(|d| d.to_string_lossy().contains("Cursor"));
465
466 assert!(
468 has_claude || has_cursor,
469 "Registry should configure at least one known watcher path pattern \
470 (expected .claude or Cursor in paths). Found paths: {all_paths:?}"
471 );
472 }
473
474 #[test]
475 fn test_tracked_file_count_initial() {
476 let watcher = SessionWatcher::new().unwrap();
477 assert_eq!(
478 watcher.tracked_file_count(),
479 0,
480 "Should start with no tracked files"
481 );
482 }
483
484 #[test]
485 fn test_db_config_creation() {
486 let config = DbConfig::default_config();
487 assert!(config.is_ok(), "Should create DbConfig successfully");
488 }
489
490 #[test]
491 fn test_file_position_tracking() {
492 let mut watcher = SessionWatcher::new().unwrap();
493
494 let path1 = PathBuf::from("/test/file1.jsonl");
495 let path2 = PathBuf::from("/test/file2.jsonl");
496
497 watcher.file_positions.insert(path1.clone(), 100);
498 watcher.file_positions.insert(path2.clone(), 200);
499
500 assert_eq!(watcher.tracked_file_count(), 2);
501 assert_eq!(watcher.file_positions.get(&path1), Some(&100));
502 assert_eq!(watcher.file_positions.get(&path2), Some(&200));
503 }
504}