Skip to main content

rover/storage/
hooks.rs

1//! SQLite `update_hook` plumbing for fast in-process task-insert wakeups.
2//!
3//! The scheduler's polling tick (default 10s) is the cross-process safety
4//! net. Inside a single process, we want sub-second pickup whenever any
5//! code path inserts into `tasks` without going through the scheduler's
6//! mpsc channel (e.g. the MCP `fetch` revalidate path, or the CLI batch
7//! enqueue). SQLite's `update_hook` gives us exactly that — **but only**
8//! for writes via the same `Connection` the hook is installed on.
9//!
10//! Because the storage layer funnels every write through the single
11//! `tokio-rusqlite` actor that `Db` owns, installing the hook on that
12//! actor's connection captures every write performed through this `Db`
13//! handle (and any clone of it — `Db: Clone` shares the same actor).
14//! Code that opens a *separate* `Db` to the same file uses a *different*
15//! actor connection; those writes are not observed and fall back to the
16//! polling tick. Cross-OS-process writes always rely on the polling tick.
17//!
18//! The hook captures a `Weak<Notify>`; once the scheduler drops its
19//! `Arc<Notify>`, subsequent fires become cheap no-ops. The returned
20//! [`UpdateHookGuard`] schedules a best-effort detach on drop.
21//!
22//! Lives inside `src/storage/` per the workspace rule that raw
23//! `rusqlite::Connection` access stays storage-internal.
24
25use std::sync::{Arc, Weak};
26
27use rusqlite::hooks::Action;
28use tokio::sync::Notify;
29use tokio_rusqlite::Connection;
30
31use super::{Db, StorageError};
32
33/// RAII guard for the installed `update_hook`. Dropping the guard sends a
34/// fire-and-forget detach (`update_hook(None)`) to the storage actor when
35/// a Tokio runtime is available; otherwise the hook is left in place and
36/// the captured `Weak<Notify>` simply lapses.
37pub struct UpdateHookGuard {
38    conn: Connection,
39}
40
41impl std::fmt::Debug for UpdateHookGuard {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        f.debug_struct("UpdateHookGuard").finish_non_exhaustive()
44    }
45}
46
47impl Drop for UpdateHookGuard {
48    fn drop(&mut self) {
49        let conn = self.conn.clone();
50        if tokio::runtime::Handle::try_current().is_ok() {
51            tokio::spawn(async move {
52                let _ = conn
53                    .call(|c| {
54                        c.update_hook(None::<fn(Action, &str, &str, i64)>);
55                        Ok::<_, rusqlite::Error>(())
56                    })
57                    .await;
58            });
59        }
60    }
61}
62
63/// Register a SQLite `update_hook` on `db`'s actor connection that fires
64/// `notify` on every insert/update of the `tasks` table.
65///
66/// The hook holds a `Weak` reference to `notify`; when the caller drops
67/// its `Arc<Notify>`, subsequent fires become no-ops. Drop the returned
68/// guard to eagerly detach.
69pub async fn register_tasks_update_hook(
70    db: &Db,
71    notify: Arc<Notify>,
72) -> Result<UpdateHookGuard, StorageError> {
73    let weak: Weak<Notify> = Arc::downgrade(&notify);
74    let conn = db.conn.clone();
75    let conn_for_guard = conn.clone();
76    conn.call(move |c| {
77        c.update_hook(Some(
78            move |action: Action, _db: &str, table: &str, _rowid: i64| {
79                if table == "tasks"
80                    && matches!(action, Action::SQLITE_INSERT | Action::SQLITE_UPDATE)
81                    && let Some(n) = weak.upgrade()
82                {
83                    n.notify_one();
84                }
85            },
86        ));
87        Ok::<_, rusqlite::Error>(())
88    })
89    .await?;
90    Ok(UpdateHookGuard {
91        conn: conn_for_guard,
92    })
93}
94
95#[cfg(test)]
96mod tests {
97    use super::*;
98    use crate::storage::Db;
99    use std::time::Duration;
100
101    #[tokio::test]
102    async fn insert_via_same_db_fires_hook() {
103        let tmp = tempfile::tempdir().unwrap();
104        let path = tmp.path().join("rover.db");
105        let db = Db::open(&path).await.unwrap();
106        let notify = Arc::new(Notify::new());
107        let _guard = register_tasks_update_hook(&db, notify.clone())
108            .await
109            .unwrap();
110
111        db.conn
112            .call(|c| {
113                c.execute(
114                    "INSERT INTO tasks (id, kind, status, created_at, updated_at, params_json) \
115                     VALUES (?1, 'batch_fetch', 'pending', 0, 0, '{}')",
116                    rusqlite::params!["hook-test-id"],
117                )?;
118                Ok::<(), rusqlite::Error>(())
119            })
120            .await
121            .unwrap();
122
123        tokio::time::timeout(Duration::from_millis(200), notify.notified())
124            .await
125            .expect("notify did not fire within 200ms");
126    }
127}