rover/storage/hooks.rs
1//! SQLite `update_hook` plumbing for fast in-process task-insert wakeups.
2//!
3//! The scheduler's polling tick (default 10s) is the cross-process safety
4//! net. Inside a single process, we want sub-second pickup whenever any
5//! code path inserts into `tasks` without going through the scheduler's
6//! mpsc channel (e.g. the MCP `fetch` revalidate path, or the CLI batch
7//! enqueue). SQLite's `update_hook` gives us exactly that — **but only**
8//! for writes via the same `Connection` the hook is installed on.
9//!
10//! Because the storage layer funnels every write through the single
11//! `tokio-rusqlite` actor that `Db` owns, installing the hook on that
12//! actor's connection captures every write performed through this `Db`
13//! handle (and any clone of it — `Db: Clone` shares the same actor).
14//! Code that opens a *separate* `Db` to the same file uses a *different*
15//! actor connection; those writes are not observed and fall back to the
16//! polling tick. Cross-OS-process writes always rely on the polling tick.
17//!
18//! The hook captures a `Weak<Notify>`; once the scheduler drops its
19//! `Arc<Notify>`, subsequent fires become cheap no-ops. The returned
20//! [`UpdateHookGuard`] schedules a best-effort detach on drop.
21//!
22//! Lives inside `src/storage/` per the workspace rule that raw
23//! `rusqlite::Connection` access stays storage-internal.
24
25use std::sync::{Arc, Weak};
26
27use rusqlite::hooks::Action;
28use tokio::sync::Notify;
29use tokio_rusqlite::Connection;
30
31use super::{Db, StorageError};
32
33/// RAII guard for the installed `update_hook`. Dropping the guard sends a
34/// fire-and-forget detach (`update_hook(None)`) to the storage actor when
35/// a Tokio runtime is available; otherwise the hook is left in place and
36/// the captured `Weak<Notify>` simply lapses.
37pub struct UpdateHookGuard {
38 conn: Connection,
39}
40
41impl std::fmt::Debug for UpdateHookGuard {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 f.debug_struct("UpdateHookGuard").finish_non_exhaustive()
44 }
45}
46
47impl Drop for UpdateHookGuard {
48 fn drop(&mut self) {
49 let conn = self.conn.clone();
50 if tokio::runtime::Handle::try_current().is_ok() {
51 tokio::spawn(async move {
52 let _ = conn
53 .call(|c| {
54 c.update_hook(None::<fn(Action, &str, &str, i64)>);
55 Ok::<_, rusqlite::Error>(())
56 })
57 .await;
58 });
59 }
60 }
61}
62
63/// Register a SQLite `update_hook` on `db`'s actor connection that fires
64/// `notify` on every insert/update of the `tasks` table.
65///
66/// The hook holds a `Weak` reference to `notify`; when the caller drops
67/// its `Arc<Notify>`, subsequent fires become no-ops. Drop the returned
68/// guard to eagerly detach.
69pub async fn register_tasks_update_hook(
70 db: &Db,
71 notify: Arc<Notify>,
72) -> Result<UpdateHookGuard, StorageError> {
73 let weak: Weak<Notify> = Arc::downgrade(¬ify);
74 let conn = db.conn.clone();
75 let conn_for_guard = conn.clone();
76 conn.call(move |c| {
77 c.update_hook(Some(
78 move |action: Action, _db: &str, table: &str, _rowid: i64| {
79 if table == "tasks"
80 && matches!(action, Action::SQLITE_INSERT | Action::SQLITE_UPDATE)
81 && let Some(n) = weak.upgrade()
82 {
83 n.notify_one();
84 }
85 },
86 ));
87 Ok::<_, rusqlite::Error>(())
88 })
89 .await?;
90 Ok(UpdateHookGuard {
91 conn: conn_for_guard,
92 })
93}
94
95#[cfg(test)]
96mod tests {
97 use super::*;
98 use crate::storage::Db;
99 use std::time::Duration;
100
101 #[tokio::test]
102 async fn insert_via_same_db_fires_hook() {
103 let tmp = tempfile::tempdir().unwrap();
104 let path = tmp.path().join("rover.db");
105 let db = Db::open(&path).await.unwrap();
106 let notify = Arc::new(Notify::new());
107 let _guard = register_tasks_update_hook(&db, notify.clone())
108 .await
109 .unwrap();
110
111 db.conn
112 .call(|c| {
113 c.execute(
114 "INSERT INTO tasks (id, kind, status, created_at, updated_at, params_json) \
115 VALUES (?1, 'batch_fetch', 'pending', 0, 0, '{}')",
116 rusqlite::params!["hook-test-id"],
117 )?;
118 Ok::<(), rusqlite::Error>(())
119 })
120 .await
121 .unwrap();
122
123 tokio::time::timeout(Duration::from_millis(200), notify.notified())
124 .await
125 .expect("notify did not fire within 200ms");
126 }
127}