use std::time::Duration;
use rusqlite::params;
use super::{Db, StorageError};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ServerRow {
pub pid: i64,
pub version: String,
pub started_at: i64,
pub last_heartbeat: i64,
}
impl Db {
pub async fn upsert_server_self(&self, pid: i64, version: String) -> Result<(), StorageError> {
let now = now_epoch();
let version_for_sql = version.clone();
self.conn
.call(move |c| {
c.execute(
"INSERT INTO servers(pid, version, started_at, last_heartbeat)
VALUES (?1, ?2, ?3, ?3)
ON CONFLICT(pid) DO UPDATE SET
version = excluded.version,
started_at = excluded.started_at,
last_heartbeat = excluded.last_heartbeat",
params![pid, version_for_sql, now],
)?;
Ok::<_, rusqlite::Error>(())
})
.await?;
Ok(())
}
pub async fn heartbeat_server(&self, pid: i64) -> Result<(), StorageError> {
let now = now_epoch();
self.conn
.call(move |c| {
c.execute(
"UPDATE servers SET last_heartbeat = ?1 WHERE pid = ?2",
params![now, pid],
)?;
Ok::<_, rusqlite::Error>(())
})
.await?;
Ok(())
}
pub async fn reap_stale_servers(&self, threshold: Duration) -> Result<usize, StorageError> {
let cutoff = now_epoch() - threshold.as_secs() as i64;
let removed = self
.conn
.call(move |c| {
let n = c.execute(
"DELETE FROM servers WHERE last_heartbeat < ?1",
params![cutoff],
)?;
Ok::<_, rusqlite::Error>(n)
})
.await?;
Ok(removed)
}
pub async fn delete_server_self(&self, pid: i64) -> Result<(), StorageError> {
self.conn
.call(move |c| {
c.execute("DELETE FROM servers WHERE pid = ?1", params![pid])?;
Ok::<_, rusqlite::Error>(())
})
.await?;
Ok(())
}
pub async fn list_servers(&self) -> Result<Vec<ServerRow>, StorageError> {
let rows = self
.conn
.call(|c| {
let mut stmt = c.prepare(
"SELECT pid, version, started_at, last_heartbeat FROM servers ORDER BY pid",
)?;
let mut out = Vec::new();
let mut rows = stmt.query([])?;
while let Some(r) = rows.next()? {
out.push(ServerRow {
pid: r.get(0)?,
version: r.get(1)?,
started_at: r.get(2)?,
last_heartbeat: r.get(3)?,
});
}
Ok::<_, rusqlite::Error>(out)
})
.await?;
Ok(rows)
}
}
fn now_epoch() -> i64 {
jiff::Timestamp::now().as_second()
}
#[cfg(test)]
mod tests {
use super::*;
async fn fresh_db() -> Db {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("rover.db");
let db = Db::open(&path).await.unwrap();
std::mem::forget(tmp);
db
}
#[tokio::test]
async fn upsert_is_idempotent_and_updates_version() {
let db = fresh_db().await;
db.upsert_server_self(123, "0.1.0".into()).await.unwrap();
db.upsert_server_self(123, "0.1.1".into()).await.unwrap();
let rows = db.list_servers().await.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].pid, 123);
assert_eq!(rows[0].version, "0.1.1");
}
#[tokio::test]
async fn heartbeat_updates_last_heartbeat() {
let db = fresh_db().await;
db.upsert_server_self(7, "v".into()).await.unwrap();
let initial = db.list_servers().await.unwrap()[0].last_heartbeat;
db.conn
.call(|c| {
c.execute("UPDATE servers SET last_heartbeat = 100 WHERE pid = 7", [])?;
Ok::<_, rusqlite::Error>(())
})
.await
.unwrap();
db.heartbeat_server(7).await.unwrap();
let updated = db.list_servers().await.unwrap()[0].last_heartbeat;
assert!(updated > 100);
assert!(updated >= initial);
}
#[tokio::test]
async fn reap_stale_removes_old_rows_only() {
let db = fresh_db().await;
db.upsert_server_self(1, "v".into()).await.unwrap();
db.upsert_server_self(2, "v".into()).await.unwrap();
db.upsert_server_self(3, "v".into()).await.unwrap();
db.conn
.call(|c| {
c.execute(
"UPDATE servers SET last_heartbeat = 0 WHERE pid IN (1,2)",
[],
)?;
Ok::<_, rusqlite::Error>(())
})
.await
.unwrap();
let removed = db
.reap_stale_servers(Duration::from_secs(60))
.await
.unwrap();
assert_eq!(removed, 2);
let rows = db.list_servers().await.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].pid, 3);
}
#[tokio::test]
async fn delete_self_is_idempotent() {
let db = fresh_db().await;
db.upsert_server_self(42, "v".into()).await.unwrap();
db.delete_server_self(42).await.unwrap();
db.delete_server_self(42).await.unwrap();
assert!(db.list_servers().await.unwrap().is_empty());
}
#[tokio::test]
async fn heartbeat_on_missing_pid_is_noop() {
let db = fresh_db().await;
db.heartbeat_server(999).await.unwrap();
assert!(db.list_servers().await.unwrap().is_empty());
}
}