lore_cli/daemon/
watcher.rs1use anyhow::{Context, Result};
11use notify::RecursiveMode;
12use notify_debouncer_mini::{new_debouncer, DebouncedEvent};
13use std::collections::HashMap;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::{mpsc, RwLock};
18
19use crate::capture::watchers::default_registry;
20use crate::storage::Database;
21
22use super::state::DaemonStats;
23
24#[derive(Clone)]
28pub struct DbConfig {
29 path: PathBuf,
30}
31
32impl DbConfig {
33 pub fn default_config() -> Result<Self> {
35 let path = crate::storage::db::default_db_path()?;
36 Ok(Self { path })
37 }
38
39 pub fn open(&self) -> Result<Database> {
41 Database::open(&self.path)
42 }
43}
44
45pub struct SessionWatcher {
50 file_positions: HashMap<PathBuf, u64>,
52 watch_dirs: Vec<PathBuf>,
54 db_config: DbConfig,
56}
57
58impl SessionWatcher {
59 pub fn new() -> Result<Self> {
68 let registry = default_registry();
69 let watch_dirs = registry.all_watch_paths();
70
71 let db_config = DbConfig::default_config()?;
72
73 Ok(Self {
74 file_positions: HashMap::new(),
75 watch_dirs,
76 db_config,
77 })
78 }
79
80 #[allow(dead_code)]
85 pub fn watch_dirs(&self) -> &[PathBuf] {
86 &self.watch_dirs
87 }
88
89 pub async fn watch(
102 &mut self,
103 stats: Arc<RwLock<DaemonStats>>,
104 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
105 ) -> Result<()> {
106 for dir in &self.watch_dirs {
108 if dir.exists() {
109 tracing::info!("Will watch for session files in {:?}", dir);
110 } else {
111 tracing::info!("Watch directory does not exist yet: {:?}", dir);
112 }
113 }
114
115 let (tx, mut rx) = mpsc::channel::<Vec<DebouncedEvent>>(100);
117
118 let mut debouncer = new_debouncer(
120 Duration::from_millis(500),
121 move |events: Result<Vec<DebouncedEvent>, notify::Error>| {
122 if let Ok(events) = events {
123 let filtered: Vec<DebouncedEvent> = events
125 .into_iter()
126 .filter(|e| {
127 let ext = e.path.extension().and_then(|ext| ext.to_str());
128 matches!(ext, Some("jsonl") | Some("vscdb"))
129 })
130 .collect();
131
132 if !filtered.is_empty() {
133 let _ = tx.blocking_send(filtered);
134 }
135 }
136 },
137 )
138 .context("Failed to create file watcher")?;
139
140 for dir in &self.watch_dirs {
142 if dir.exists() {
143 debouncer
144 .watcher()
145 .watch(dir, RecursiveMode::Recursive)
146 .context(format!("Failed to start watching directory {dir:?}"))?;
147
148 tracing::info!("Watching for session files in {:?}", dir);
149 }
150 }
151
152 self.initial_scan(&stats).await?;
154
155 loop {
157 tokio::select! {
158 Some(events) = rx.recv() => {
159 for event in events {
160 if let Err(e) = self.handle_file_event(&event.path, &stats).await {
161 let error_msg = e.to_string();
162 if error_msg.contains("unable to open database")
165 || error_msg.contains("database is locked")
166 {
167 tracing::debug!(
168 "Database temporarily unavailable for {:?}: {}",
169 event.path,
170 e
171 );
172 } else {
173 tracing::warn!(
174 "Error handling file event for {:?}: {}",
175 event.path,
176 e
177 );
178 let mut stats_guard = stats.write().await;
179 stats_guard.errors += 1;
180 }
181 }
182 }
183 }
184 _ = shutdown_rx.recv() => {
185 tracing::info!("Session watcher shutting down");
186 break;
187 }
188 }
189 }
190
191 Ok(())
192 }
193
194 fn open_db(&self) -> Result<Database> {
196 self.db_config.open()
197 }
198
199 async fn initial_scan(&mut self, stats: &Arc<RwLock<DaemonStats>>) -> Result<()> {
205 tracing::info!("Performing initial scan of session files...");
206
207 let registry = default_registry();
208 let mut total_files = 0;
209
210 for watcher in registry.available_watchers() {
211 let watcher_name = watcher.info().name;
212 match watcher.find_sources() {
213 Ok(sources) => {
214 tracing::info!("Found {} sources for {}", sources.len(), watcher_name);
215 total_files += sources.len();
216
217 for path in sources {
218 match self.process_file_sync(&path) {
220 Ok(Some((sessions_imported, messages_imported))) => {
221 let mut stats_guard = stats.write().await;
222 stats_guard.sessions_imported += sessions_imported;
223 stats_guard.messages_imported += messages_imported;
224 stats_guard.files_watched = self.file_positions.len();
225 }
226 Ok(None) => {
227 }
229 Err(e) => {
230 tracing::warn!("Failed to import {:?}: {}", path, e);
231 let mut stats_guard = stats.write().await;
232 stats_guard.errors += 1;
233 }
234 }
235 }
236 }
237 Err(e) => {
238 tracing::warn!("Failed to find sources for {}: {}", watcher_name, e);
239 }
240 }
241 }
242
243 {
244 let mut stats_guard = stats.write().await;
245 stats_guard.files_watched = total_files;
246 }
247
248 Ok(())
249 }
250
251 async fn handle_file_event(
253 &mut self,
254 path: &Path,
255 stats: &Arc<RwLock<DaemonStats>>,
256 ) -> Result<()> {
257 let ext = path.extension().and_then(|e| e.to_str());
258
259 if !matches!(ext, Some("jsonl") | Some("vscdb")) {
261 return Ok(());
262 }
263
264 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
266 if name.starts_with("agent-") {
267 return Ok(());
268 }
269 }
270
271 if !path.exists() {
273 self.file_positions.remove(path);
275 return Ok(());
276 }
277
278 match self.process_file_sync(path) {
280 Ok(Some((sessions_imported, messages_imported))) => {
281 let mut stats_guard = stats.write().await;
282 stats_guard.sessions_imported += sessions_imported;
283 stats_guard.messages_imported += messages_imported;
284 stats_guard.files_watched = self.file_positions.len();
285 }
286 Ok(None) => {
287 }
289 Err(e) => {
290 return Err(e);
291 }
292 }
293
294 Ok(())
295 }
296
297 fn process_file_sync(&mut self, path: &Path) -> Result<Option<(u64, u64)>> {
302 let db = self.open_db()?;
303 let path_str = path.to_string_lossy();
304 let last_pos = self.file_positions.get(path).copied().unwrap_or(0);
305
306 let metadata = std::fs::metadata(path).context("Failed to get file metadata")?;
308 let current_size = metadata.len();
309
310 if current_size <= last_pos {
311 if current_size < last_pos {
313 self.file_positions.insert(path.to_path_buf(), 0);
315 }
316 return Ok(None);
317 }
318
319 if db.session_exists_by_source(&path_str)? {
321 self.file_positions.insert(path.to_path_buf(), current_size);
323 return Ok(None);
324 }
325
326 let result = self.import_file_sync(path, &db)?;
328
329 self.file_positions.insert(path.to_path_buf(), current_size);
331
332 Ok(Some(result))
333 }
334
335 fn import_file_sync(&mut self, path: &Path, db: &Database) -> Result<(u64, u64)> {
340 tracing::debug!("Importing session file: {:?}", path);
341
342 let path_buf = path.to_path_buf();
343 let registry = default_registry();
344
345 let mut parsed_sessions = Vec::new();
347
348 for watcher in registry.available_watchers() {
349 match watcher.parse_source(&path_buf) {
350 Ok(sessions) if !sessions.is_empty() => {
351 parsed_sessions = sessions;
352 break;
353 }
354 Ok(_) => continue,
355 Err(e) => {
356 tracing::debug!(
357 "Watcher {} could not parse {:?}: {}",
358 watcher.info().name,
359 path,
360 e
361 );
362 }
363 }
364 }
365
366 if parsed_sessions.is_empty() {
367 tracing::debug!("No watcher could parse {:?}", path);
368 return Ok((0, 0));
369 }
370
371 let mut total_sessions = 0u64;
372 let mut total_messages = 0u64;
373
374 for (session, messages) in parsed_sessions {
375 if messages.is_empty() {
376 continue;
377 }
378
379 let message_count = messages.len();
380
381 db.insert_session(&session)?;
383
384 let mut latest_branch: Option<String> = None;
386 for msg in &messages {
387 db.insert_message(msg)?;
388 if msg.git_branch.is_some() {
390 latest_branch = msg.git_branch.clone();
391 }
392 }
393
394 if let Some(ref new_branch) = latest_branch {
397 if session.git_branch.as_ref() != Some(new_branch) {
398 if let Err(e) = db.update_session_branch(session.id, new_branch) {
399 tracing::warn!(
400 "Failed to update session branch for {}: {}",
401 &session.id.to_string()[..8],
402 e
403 );
404 } else {
405 tracing::debug!(
406 "Updated session {} branch to {}",
407 &session.id.to_string()[..8],
408 new_branch
409 );
410 }
411 }
412 }
413
414 tracing::info!(
415 "Imported session {} with {} messages from {:?}",
416 &session.id.to_string()[..8],
417 message_count,
418 path.file_name().unwrap_or_default()
419 );
420
421 total_sessions += 1;
422 total_messages += message_count as u64;
423 }
424
425 if let Ok(metadata) = std::fs::metadata(path) {
427 self.file_positions
428 .insert(path.to_path_buf(), metadata.len());
429 }
430
431 Ok((total_sessions, total_messages))
432 }
433
434 #[allow(dead_code)]
439 pub fn tracked_file_count(&self) -> usize {
440 self.file_positions.len()
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447
448 #[test]
449 fn test_session_watcher_creation() {
450 let watcher = SessionWatcher::new();
451 assert!(watcher.is_ok(), "Should create watcher successfully");
452
453 let _watcher = watcher.unwrap();
457 }
458
459 #[test]
460 fn test_watch_dirs_from_registry() {
461 use crate::capture::watchers::default_registry;
462
463 let registry = default_registry();
471 let all_watchers = registry.all_watchers();
472
473 let all_paths: Vec<_> = all_watchers.iter().flat_map(|w| w.watch_paths()).collect();
475
476 let has_claude = all_paths
477 .iter()
478 .any(|d| d.to_string_lossy().contains(".claude"));
479 let has_cursor = all_paths
480 .iter()
481 .any(|d| d.to_string_lossy().contains("Cursor"));
482
483 assert!(
485 has_claude || has_cursor,
486 "Registry should configure at least one known watcher path pattern \
487 (expected .claude or Cursor in paths). Found paths: {all_paths:?}"
488 );
489 }
490
491 #[test]
492 fn test_tracked_file_count_initial() {
493 let watcher = SessionWatcher::new().unwrap();
494 assert_eq!(
495 watcher.tracked_file_count(),
496 0,
497 "Should start with no tracked files"
498 );
499 }
500
501 #[test]
502 fn test_db_config_creation() {
503 let config = DbConfig::default_config();
504 assert!(config.is_ok(), "Should create DbConfig successfully");
505 }
506
507 #[test]
508 fn test_file_position_tracking() {
509 let mut watcher = SessionWatcher::new().unwrap();
510
511 let path1 = PathBuf::from("/test/file1.jsonl");
512 let path2 = PathBuf::from("/test/file2.jsonl");
513
514 watcher.file_positions.insert(path1.clone(), 100);
515 watcher.file_positions.insert(path2.clone(), 200);
516
517 assert_eq!(watcher.tracked_file_count(), 2);
518 assert_eq!(watcher.file_positions.get(&path1), Some(&100));
519 assert_eq!(watcher.file_positions.get(&path2), Some(&200));
520 }
521}