Skip to main content

rover/storage/
servers.rs

1//! `servers` table: tracks live `rover mcp` instances.
2//!
3//! Each running server inserts its own PID row on startup and refreshes
4//! `last_heartbeat` on a tokio interval. Stale rows (`last_heartbeat`
5//! older than the configured threshold) are reaped at startup. Clean
6//! shutdown deletes the own row.
7
8use std::time::Duration;
9
10use rusqlite::params;
11
12use super::{Db, StorageError};
13
14/// Row shape returned by query helpers + used by tests.
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct ServerRow {
17    pub pid: i64,
18    pub version: String,
19    pub started_at: i64,
20    pub last_heartbeat: i64,
21}
22
23impl Db {
24    /// Upsert the current process's row. If a row already exists for `pid`
25    /// (from a crashed prior run of this PID), its `started_at` is reset.
26    pub async fn upsert_server_self(&self, pid: i64, version: String) -> Result<(), StorageError> {
27        let now = now_epoch();
28        let version_for_sql = version.clone();
29        self.conn
30            .call(move |c| {
31                c.execute(
32                    "INSERT INTO servers(pid, version, started_at, last_heartbeat)
33                     VALUES (?1, ?2, ?3, ?3)
34                     ON CONFLICT(pid) DO UPDATE SET
35                       version = excluded.version,
36                       started_at = excluded.started_at,
37                       last_heartbeat = excluded.last_heartbeat",
38                    params![pid, version_for_sql, now],
39                )?;
40                Ok::<_, rusqlite::Error>(())
41            })
42            .await?;
43        Ok(())
44    }
45
46    /// Refresh `last_heartbeat` for `pid`. If no row exists, this is a no-op
47    /// (logged at the caller).
48    pub async fn heartbeat_server(&self, pid: i64) -> Result<(), StorageError> {
49        let now = now_epoch();
50        self.conn
51            .call(move |c| {
52                c.execute(
53                    "UPDATE servers SET last_heartbeat = ?1 WHERE pid = ?2",
54                    params![now, pid],
55                )?;
56                Ok::<_, rusqlite::Error>(())
57            })
58            .await?;
59        Ok(())
60    }
61
62    /// Delete rows whose `last_heartbeat` is older than `threshold` ago.
63    /// Returns the number of rows removed.
64    pub async fn reap_stale_servers(&self, threshold: Duration) -> Result<usize, StorageError> {
65        let cutoff = now_epoch() - threshold.as_secs() as i64;
66        let removed = self
67            .conn
68            .call(move |c| {
69                let n = c.execute(
70                    "DELETE FROM servers WHERE last_heartbeat < ?1",
71                    params![cutoff],
72                )?;
73                Ok::<_, rusqlite::Error>(n)
74            })
75            .await?;
76        Ok(removed)
77    }
78
79    /// Remove the row for `pid` (idempotent).
80    pub async fn delete_server_self(&self, pid: i64) -> Result<(), StorageError> {
81        self.conn
82            .call(move |c| {
83                c.execute("DELETE FROM servers WHERE pid = ?1", params![pid])?;
84                Ok::<_, rusqlite::Error>(())
85            })
86            .await?;
87        Ok(())
88    }
89
90    /// Read all rows. Used by tests and by future M3 integration scenarios.
91    pub async fn list_servers(&self) -> Result<Vec<ServerRow>, StorageError> {
92        let rows = self
93            .conn
94            .call(|c| {
95                let mut stmt = c.prepare(
96                    "SELECT pid, version, started_at, last_heartbeat FROM servers ORDER BY pid",
97                )?;
98                let mut out = Vec::new();
99                let mut rows = stmt.query([])?;
100                while let Some(r) = rows.next()? {
101                    out.push(ServerRow {
102                        pid: r.get(0)?,
103                        version: r.get(1)?,
104                        started_at: r.get(2)?,
105                        last_heartbeat: r.get(3)?,
106                    });
107                }
108                Ok::<_, rusqlite::Error>(out)
109            })
110            .await?;
111        Ok(rows)
112    }
113}
114
115fn now_epoch() -> i64 {
116    jiff::Timestamp::now().as_second()
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122
123    async fn fresh_db() -> Db {
124        let tmp = tempfile::tempdir().unwrap();
125        let path = tmp.path().join("rover.db");
126        let db = Db::open(&path).await.unwrap();
127        // Keep tempdir alive by leaking it; this is a unit test, the OS will
128        // reclaim on exit.
129        std::mem::forget(tmp);
130        db
131    }
132
133    #[tokio::test]
134    async fn upsert_is_idempotent_and_updates_version() {
135        let db = fresh_db().await;
136        db.upsert_server_self(123, "0.1.0".into()).await.unwrap();
137        db.upsert_server_self(123, "0.1.1".into()).await.unwrap();
138        let rows = db.list_servers().await.unwrap();
139        assert_eq!(rows.len(), 1);
140        assert_eq!(rows[0].pid, 123);
141        assert_eq!(rows[0].version, "0.1.1");
142    }
143
144    #[tokio::test]
145    async fn heartbeat_updates_last_heartbeat() {
146        let db = fresh_db().await;
147        db.upsert_server_self(7, "v".into()).await.unwrap();
148        let initial = db.list_servers().await.unwrap()[0].last_heartbeat;
149        // Push the timestamp backwards to simulate elapsed time.
150        db.conn
151            .call(|c| {
152                c.execute("UPDATE servers SET last_heartbeat = 100 WHERE pid = 7", [])?;
153                Ok::<_, rusqlite::Error>(())
154            })
155            .await
156            .unwrap();
157        db.heartbeat_server(7).await.unwrap();
158        let updated = db.list_servers().await.unwrap()[0].last_heartbeat;
159        assert!(updated > 100);
160        assert!(updated >= initial);
161    }
162
163    #[tokio::test]
164    async fn reap_stale_removes_old_rows_only() {
165        let db = fresh_db().await;
166        db.upsert_server_self(1, "v".into()).await.unwrap();
167        db.upsert_server_self(2, "v".into()).await.unwrap();
168        db.upsert_server_self(3, "v".into()).await.unwrap();
169        // Mark PID 1 and 2 as ancient.
170        db.conn
171            .call(|c| {
172                c.execute(
173                    "UPDATE servers SET last_heartbeat = 0 WHERE pid IN (1,2)",
174                    [],
175                )?;
176                Ok::<_, rusqlite::Error>(())
177            })
178            .await
179            .unwrap();
180        let removed = db
181            .reap_stale_servers(Duration::from_secs(60))
182            .await
183            .unwrap();
184        assert_eq!(removed, 2);
185        let rows = db.list_servers().await.unwrap();
186        assert_eq!(rows.len(), 1);
187        assert_eq!(rows[0].pid, 3);
188    }
189
190    #[tokio::test]
191    async fn delete_self_is_idempotent() {
192        let db = fresh_db().await;
193        db.upsert_server_self(42, "v".into()).await.unwrap();
194        db.delete_server_self(42).await.unwrap();
195        db.delete_server_self(42).await.unwrap();
196        assert!(db.list_servers().await.unwrap().is_empty());
197    }
198
199    #[tokio::test]
200    async fn heartbeat_on_missing_pid_is_noop() {
201        let db = fresh_db().await;
202        db.heartbeat_server(999).await.unwrap();
203        assert!(db.list_servers().await.unwrap().is_empty());
204    }
205}