1use std::time::Duration;
9
10use rusqlite::params;
11
12use super::{Db, StorageError};
13
14#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct ServerRow {
17 pub pid: i64,
18 pub version: String,
19 pub started_at: i64,
20 pub last_heartbeat: i64,
21}
22
23impl Db {
24 pub async fn upsert_server_self(&self, pid: i64, version: String) -> Result<(), StorageError> {
27 let now = now_epoch();
28 let version_for_sql = version.clone();
29 self.conn
30 .call(move |c| {
31 c.execute(
32 "INSERT INTO servers(pid, version, started_at, last_heartbeat)
33 VALUES (?1, ?2, ?3, ?3)
34 ON CONFLICT(pid) DO UPDATE SET
35 version = excluded.version,
36 started_at = excluded.started_at,
37 last_heartbeat = excluded.last_heartbeat",
38 params![pid, version_for_sql, now],
39 )?;
40 Ok::<_, rusqlite::Error>(())
41 })
42 .await?;
43 Ok(())
44 }
45
46 pub async fn heartbeat_server(&self, pid: i64) -> Result<(), StorageError> {
49 let now = now_epoch();
50 self.conn
51 .call(move |c| {
52 c.execute(
53 "UPDATE servers SET last_heartbeat = ?1 WHERE pid = ?2",
54 params![now, pid],
55 )?;
56 Ok::<_, rusqlite::Error>(())
57 })
58 .await?;
59 Ok(())
60 }
61
62 pub async fn reap_stale_servers(&self, threshold: Duration) -> Result<usize, StorageError> {
65 let cutoff = now_epoch() - threshold.as_secs() as i64;
66 let removed = self
67 .conn
68 .call(move |c| {
69 let n = c.execute(
70 "DELETE FROM servers WHERE last_heartbeat < ?1",
71 params![cutoff],
72 )?;
73 Ok::<_, rusqlite::Error>(n)
74 })
75 .await?;
76 Ok(removed)
77 }
78
79 pub async fn delete_server_self(&self, pid: i64) -> Result<(), StorageError> {
81 self.conn
82 .call(move |c| {
83 c.execute("DELETE FROM servers WHERE pid = ?1", params![pid])?;
84 Ok::<_, rusqlite::Error>(())
85 })
86 .await?;
87 Ok(())
88 }
89
90 pub async fn list_servers(&self) -> Result<Vec<ServerRow>, StorageError> {
92 let rows = self
93 .conn
94 .call(|c| {
95 let mut stmt = c.prepare(
96 "SELECT pid, version, started_at, last_heartbeat FROM servers ORDER BY pid",
97 )?;
98 let mut out = Vec::new();
99 let mut rows = stmt.query([])?;
100 while let Some(r) = rows.next()? {
101 out.push(ServerRow {
102 pid: r.get(0)?,
103 version: r.get(1)?,
104 started_at: r.get(2)?,
105 last_heartbeat: r.get(3)?,
106 });
107 }
108 Ok::<_, rusqlite::Error>(out)
109 })
110 .await?;
111 Ok(rows)
112 }
113}
114
115fn now_epoch() -> i64 {
116 jiff::Timestamp::now().as_second()
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122
123 async fn fresh_db() -> Db {
124 let tmp = tempfile::tempdir().unwrap();
125 let path = tmp.path().join("rover.db");
126 let db = Db::open(&path).await.unwrap();
127 std::mem::forget(tmp);
130 db
131 }
132
133 #[tokio::test]
134 async fn upsert_is_idempotent_and_updates_version() {
135 let db = fresh_db().await;
136 db.upsert_server_self(123, "0.1.0".into()).await.unwrap();
137 db.upsert_server_self(123, "0.1.1".into()).await.unwrap();
138 let rows = db.list_servers().await.unwrap();
139 assert_eq!(rows.len(), 1);
140 assert_eq!(rows[0].pid, 123);
141 assert_eq!(rows[0].version, "0.1.1");
142 }
143
144 #[tokio::test]
145 async fn heartbeat_updates_last_heartbeat() {
146 let db = fresh_db().await;
147 db.upsert_server_self(7, "v".into()).await.unwrap();
148 let initial = db.list_servers().await.unwrap()[0].last_heartbeat;
149 db.conn
151 .call(|c| {
152 c.execute("UPDATE servers SET last_heartbeat = 100 WHERE pid = 7", [])?;
153 Ok::<_, rusqlite::Error>(())
154 })
155 .await
156 .unwrap();
157 db.heartbeat_server(7).await.unwrap();
158 let updated = db.list_servers().await.unwrap()[0].last_heartbeat;
159 assert!(updated > 100);
160 assert!(updated >= initial);
161 }
162
163 #[tokio::test]
164 async fn reap_stale_removes_old_rows_only() {
165 let db = fresh_db().await;
166 db.upsert_server_self(1, "v".into()).await.unwrap();
167 db.upsert_server_self(2, "v".into()).await.unwrap();
168 db.upsert_server_self(3, "v".into()).await.unwrap();
169 db.conn
171 .call(|c| {
172 c.execute(
173 "UPDATE servers SET last_heartbeat = 0 WHERE pid IN (1,2)",
174 [],
175 )?;
176 Ok::<_, rusqlite::Error>(())
177 })
178 .await
179 .unwrap();
180 let removed = db
181 .reap_stale_servers(Duration::from_secs(60))
182 .await
183 .unwrap();
184 assert_eq!(removed, 2);
185 let rows = db.list_servers().await.unwrap();
186 assert_eq!(rows.len(), 1);
187 assert_eq!(rows[0].pid, 3);
188 }
189
190 #[tokio::test]
191 async fn delete_self_is_idempotent() {
192 let db = fresh_db().await;
193 db.upsert_server_self(42, "v".into()).await.unwrap();
194 db.delete_server_self(42).await.unwrap();
195 db.delete_server_self(42).await.unwrap();
196 assert!(db.list_servers().await.unwrap().is_empty());
197 }
198
199 #[tokio::test]
200 async fn heartbeat_on_missing_pid_is_noop() {
201 let db = fresh_db().await;
202 db.heartbeat_server(999).await.unwrap();
203 assert!(db.list_servers().await.unwrap().is_empty());
204 }
205}