Skip to main content

lore_engine/engine/
watcher.rs

1//! File watcher — monitors a vault directory for external `.md` changes and
2//! syncs them into the DB and graph. Uses debounced `notify` events (500ms).
3
4use super::{db, file_loader, vault_ops};
5use super::error::LoreError;
6use crate::state::AppState;
7use notify::RecursiveMode;
8use notify_debouncer_mini::{new_debouncer, DebouncedEvent, Debouncer};
9use serde::Serialize;
10use std::collections::HashSet;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::time::Duration;
14
15/// A page that was created or modified by an external change.
16#[derive(Debug, Clone, Serialize)]
17pub struct ChangedPage {
18    pub slug: String,
19    pub title: String,
20}
21
22/// Payload emitted when the watcher detects vault changes.
23#[derive(Debug, Clone, Serialize)]
24pub struct VaultChangedPayload {
25    pub changed: Vec<ChangedPage>,
26    pub deleted: Vec<String>,
27    pub page_count: usize,
28    pub link_count: usize,
29}
30
31/// Callback type for vault change notifications.
32/// The bridge layer sets this to emit a Qt signal.
33pub type OnVaultChanged = Arc<dyn Fn(VaultChangedPayload) + Send + Sync>;
34
35/// Start watching a vault directory for .md file changes.
36/// Returns the debouncer handle — drop it to stop watching.
37pub fn start_watcher(
38    vault_path: &Path,
39    state: Arc<AppState>,
40    on_change: OnVaultChanged,
41) -> Result<Debouncer<notify::RecommendedWatcher>, LoreError> {
42    let vault = vault_path.to_path_buf();
43
44    let mut debouncer = new_debouncer(
45        Duration::from_millis(500),
46        move |result: Result<Vec<DebouncedEvent>, notify::Error>| {
47            match result {
48                Ok(events) => {
49                    if let Err(e) = handle_events(&state, &on_change, &vault, events) {
50                        log::error!("Watcher event handler error: {e}");
51                    }
52                }
53                Err(e) => log::error!("Watcher error: {e}"),
54            }
55        },
56    )
57    .map_err(|e| LoreError::WatcherError(e.to_string()))?;
58
59    debouncer
60        .watcher()
61        .watch(vault_path, RecursiveMode::Recursive)
62        .map_err(|e| LoreError::WatcherError(e.to_string()))?;
63
64    log::info!("File watcher started for {}", vault_path.display());
65    Ok(debouncer)
66}
67
68fn handle_events(
69    state: &AppState,
70    on_change: &OnVaultChanged,
71    vault_path: &Path,
72    events: Vec<DebouncedEvent>,
73) -> Result<(), LoreError> {
74    let mut paths: HashSet<PathBuf> = HashSet::new();
75    for event in &events {
76        // Only process paths within the vault directory
77        if event.path.starts_with(vault_path) {
78            paths.insert(event.path.clone());
79        } else {
80            log::debug!("Ignoring event outside vault: {}", event.path.display());
81        }
82    }
83
84    if paths.is_empty() {
85        return Ok(());
86    }
87
88    let payload = {
89        let db_lock = state.db.lock()?;
90        let conn = match db_lock.as_ref() {
91            Some(c) => c,
92            None => return Ok(()),
93        };
94
95        let existing_hashes = db::get_all_hashes(conn)?;
96        let known_slugs = vault_ops::get_known_slugs_from(conn)?;
97        let mut changed_pages: Vec<ChangedPage> = Vec::new();
98        let mut deleted_slugs: Vec<String> = Vec::new();
99
100        conn.execute_batch("BEGIN TRANSACTION")?;
101
102        for path in &paths {
103            if path.exists() {
104                handle_file_change(conn, vault_path, path, &existing_hashes, &known_slugs, &mut changed_pages)?;
105            } else {
106                handle_file_delete(conn, vault_path, path, &mut deleted_slugs)?;
107            }
108        }
109
110        conn.execute_batch("COMMIT")?;
111
112        if changed_pages.is_empty() && deleted_slugs.is_empty() {
113            return Ok(());
114        }
115
116        let mut graph = state.graph.lock()?;
117
118        for cp in &changed_pages {
119            graph.upsert_node(&cp.slug, &cp.title, false);
120
121            let targets = db::get_outgoing_targets(conn, &cp.slug)?;
122            graph.set_outgoing_edges(&cp.slug, &targets);
123
124            for target in &targets {
125                if graph.degree(target) == 0 {
126                    if let Some(p) = db::get_page(conn, target)? {
127                        graph.upsert_node(target, &p.title, p.is_placeholder);
128                    }
129                }
130            }
131        }
132
133        for slug in &deleted_slugs {
134            if let Some(p) = db::get_page(conn, slug)? {
135                graph.upsert_node(slug, &p.title, true);
136                graph.set_outgoing_edges(slug, &[]);
137            } else {
138                graph.remove_node(slug);
139            }
140        }
141
142        let orphans = graph.cleanup_orphaned_placeholders();
143        for slug in &orphans {
144            if let Err(e) = db::delete_page(conn, slug) {
145                log::warn!("Failed to clean up orphan placeholder '{slug}': {e}");
146            }
147        }
148
149        let (page_count, link_count) = graph.stats();
150
151        VaultChangedPayload {
152            changed: changed_pages,
153            deleted: deleted_slugs,
154            page_count,
155            link_count,
156        }
157    };
158
159    log::info!(
160        "Watcher: {} changed, {} deleted",
161        payload.changed.len(),
162        payload.deleted.len()
163    );
164
165    on_change(payload);
166
167    Ok(())
168}
169
170fn handle_file_change(
171    conn: &rusqlite::Connection,
172    vault_path: &Path,
173    path: &Path,
174    existing_hashes: &std::collections::HashMap<String, String>,
175    known_slugs: &HashSet<String>,
176    changed: &mut Vec<ChangedPage>,
177) -> Result<(), LoreError> {
178    let scanned = match file_loader::scan_single_file(vault_path, path) {
179        Some(f) => f,
180        None => return Ok(()),
181    };
182
183    // Self-write detection: skip if hash matches AND page already exists in DB
184    let page_exists = db::get_page(conn, &scanned.slug)?.is_some();
185    if page_exists {
186        if let Some(old_hash) = existing_hashes.get(&scanned.slug) {
187            if old_hash == &scanned.content_hash {
188                // Hash matches — DB is already up to date. Still report as changed
189                // so in-memory state (graph, page list) gets refreshed.
190                changed.push(ChangedPage {
191                    slug: scanned.slug,
192                    title: scanned.title,
193                });
194                return Ok(());
195            }
196        }
197    }
198
199    db::upsert_page(
200        conn,
201        &scanned.slug,
202        &scanned.title,
203        Some(&scanned.relative_path),
204        Some(&scanned.content_hash),
205        false,
206    )?;
207
208    // Use shared link-sync logic (with fuzzy resolution)
209    vault_ops::sync_page_links(conn, &scanned.slug, &scanned.title, &scanned.content, known_slugs)?;
210
211    changed.push(ChangedPage {
212        slug: scanned.slug,
213        title: scanned.title,
214    });
215
216    Ok(())
217}
218
219fn handle_file_delete(
220    conn: &rusqlite::Connection,
221    vault_path: &Path,
222    path: &Path,
223    deleted: &mut Vec<String>,
224) -> Result<(), LoreError> {
225    let relative = match path.strip_prefix(vault_path) {
226        Ok(r) => r.to_string_lossy().replace('\\', "/"),
227        Err(_) => return Ok(()),
228    };
229    let slug = file_loader::slug_from_path(&relative);
230
231    let page = match db::get_page(conn, &slug)? {
232        Some(p) => p,
233        None => return Ok(()),
234    };
235
236    let backlinks = db::get_backlinks(conn, &slug)?;
237    if backlinks.is_empty() {
238        db::delete_page(conn, &slug)?;
239    } else {
240        db::upsert_page(conn, &slug, &page.title, None, None, true)?;
241        db::delete_links_from(conn, &slug)?;
242        db::delete_fts(conn, &slug)?;
243    }
244
245    deleted.push(slug);
246    Ok(())
247}