use crate::ManagerPeerEntry;
use crate::profile as core_profile;
use eyre::Report;
use rusqlite::{Connection, OptionalExtension, Transaction, params};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex, OnceLock};
fn opt_i64_from_u64(value: Option<u64>) -> Option<i64> {
value.and_then(|v| i64::try_from(v).ok())
}
fn opt_u64_from_i64(value: Option<i64>) -> Option<u64> {
value.and_then(|v| u64::try_from(v).ok())
}
fn u64_from_i64(value: i64) -> u64 {
u64::try_from(value).unwrap_or(0)
}
pub struct PeerDatabase {
conn: Connection,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct WorkerPeerEntry {
pub host: String,
pub quic_port: u16,
pub tcp_port: u16,
pub last_ok: Option<u64>,
pub last_fail: Option<u64>,
pub last_seen_in_gossip: Option<u64>,
pub tls_cert: String,
pub tls_fp: String,
pub manager_name: String,
pub manager_peer: Option<ManagerPeerEntry>,
}
const SCHEMA_VERSION: i32 = 2;
impl PeerDatabase {
pub fn open(db_path: PathBuf) -> Result<Self, Report> {
if let Some(parent) = db_path.parent() {
std::fs::create_dir_all(parent)?;
}
let conn = Connection::open(&db_path)?;
conn.pragma_update(None, "journal_mode", "WAL")?;
conn.pragma_update(None, "synchronous", "NORMAL")?;
conn.pragma_update(None, "foreign_keys", "ON")?;
conn.pragma_update(None, "temp_store", "MEMORY")?;
let mut db = Self { conn };
db.migrate()?;
Ok(db)
}
fn migrate(&mut self) -> Result<(), Report> {
let tx = self.conn.transaction()?;
tx.execute(
"CREATE TABLE IF NOT EXISTS schema_info (
key TEXT PRIMARY KEY,
value INTEGER NOT NULL
)",
[],
)?;
let current_version: i32 = tx
.query_row(
"SELECT value FROM schema_info WHERE key = 'version'",
[],
|row| row.get(0),
)
.optional()?
.unwrap_or(0);
if current_version < SCHEMA_VERSION {
Self::run_migrations(&tx, current_version)?;
tx.execute(
"INSERT OR REPLACE INTO schema_info (key, value) VALUES ('version', ?)",
params![SCHEMA_VERSION],
)?;
}
tx.commit()?;
Ok(())
}
fn run_migrations(tx: &Transaction, from_version: i32) -> Result<(), Report> {
if from_version < 1 {
tx.execute(
"CREATE TABLE IF NOT EXISTS manager_peers (
profile TEXT NOT NULL,
manager_id TEXT NOT NULL,
manager_name TEXT NOT NULL,
tenant TEXT NOT NULL,
cluster TEXT NOT NULL,
host TEXT NOT NULL,
tcp_port INTEGER NOT NULL,
quic_port INTEGER NOT NULL,
pub_fp TEXT NOT NULL,
csk_ver INTEGER NOT NULL,
tls_cert TEXT NOT NULL,
tls_fp TEXT NOT NULL,
health_json TEXT,
created_at INTEGER NOT NULL DEFAULT (unixepoch()),
updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
PRIMARY KEY (profile, manager_id, tenant, cluster)
)",
[],
)?;
tx.execute(
"CREATE TABLE IF NOT EXISTS worker_peers (
profile TEXT NOT NULL,
host TEXT NOT NULL,
quic_port INTEGER NOT NULL,
tcp_port INTEGER NOT NULL,
last_ok INTEGER,
last_fail INTEGER,
tls_cert TEXT NOT NULL DEFAULT '',
tls_fp TEXT NOT NULL DEFAULT '',
manager_name TEXT NOT NULL DEFAULT '',
manager_peer_json TEXT,
created_at INTEGER NOT NULL DEFAULT (unixepoch()),
updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
PRIMARY KEY (profile, host, quic_port, tcp_port)
)",
[],
)?;
tx.execute(
"CREATE TABLE IF NOT EXISTS peer_versions (
profile TEXT NOT NULL,
role TEXT NOT NULL,
version INTEGER NOT NULL DEFAULT 0,
updated_at INTEGER NOT NULL DEFAULT (unixepoch()),
PRIMARY KEY (profile, role)
)",
[],
)?;
tx.execute(
"CREATE INDEX IF NOT EXISTS idx_manager_peers_tenant_cluster
ON manager_peers(profile, tenant, cluster)",
[],
)?;
tx.execute(
"CREATE INDEX IF NOT EXISTS idx_worker_peers_profile
ON worker_peers(profile)",
[],
)?;
tx.execute(
"CREATE TRIGGER IF NOT EXISTS update_manager_peers_timestamp
AFTER UPDATE ON manager_peers
BEGIN
UPDATE manager_peers SET updated_at = unixepoch() WHERE rowid = NEW.rowid;
END",
[],
)?;
tx.execute(
"CREATE TRIGGER IF NOT EXISTS update_worker_peers_timestamp
AFTER UPDATE ON worker_peers
BEGIN
UPDATE worker_peers SET updated_at = unixepoch() WHERE rowid = NEW.rowid;
END",
[],
)?;
}
if from_version < 2 {
tx.execute(
"ALTER TABLE worker_peers ADD COLUMN last_seen_in_gossip INTEGER",
[],
)?;
}
Ok(())
}
pub fn load_manager_peers(&self, profile: &str) -> Result<Vec<ManagerPeerEntry>, Report> {
let mut stmt = self.conn.prepare(
"SELECT manager_id, manager_name, tenant, cluster, host, tcp_port, quic_port,
pub_fp, csk_ver, tls_cert, tls_fp, health_json
FROM manager_peers
WHERE profile = ?
ORDER BY manager_id",
)?;
let peer_iter = stmt.query_map([profile], |row| {
let health_json: Option<String> = row.get(11)?;
let health = match health_json {
Some(json) => serde_json::from_str(&json).ok(),
None => None,
};
Ok(ManagerPeerEntry {
manager_id: row.get(0)?,
manager_name: row.get(1)?,
tenant: row.get(2)?,
cluster: row.get(3)?,
host: row.get(4)?,
tcp_port: row.get(5)?,
quic_port: row.get(6)?,
pub_fp: row.get(7)?,
csk_ver: row.get(8)?,
tls_cert: row.get(9)?,
tls_fp: row.get(10)?,
health,
})
})?;
let mut peers = Vec::new();
for peer in peer_iter {
peers.push(peer?);
}
tracing::debug!(
target: "peer_db",
profile = %profile,
count = peers.len(),
"loaded manager peers from database"
);
Ok(peers)
}
pub fn save_manager_peers(
&mut self,
profile: &str,
peers: &[ManagerPeerEntry],
) -> Result<(), Report> {
let tx = self.conn.transaction()?;
tx.execute(
"DELETE FROM manager_peers WHERE profile = ?",
params![profile],
)?;
{
let mut stmt = tx.prepare(
"INSERT INTO manager_peers
(profile, manager_id, manager_name, tenant, cluster, host, tcp_port, quic_port,
pub_fp, csk_ver, tls_cert, tls_fp, health_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)?;
for peer in peers {
let health_json = peer
.health
.as_ref()
.and_then(|h| serde_json::to_string(h).ok());
stmt.execute(params![
profile,
peer.manager_id,
peer.manager_name,
peer.tenant,
peer.cluster,
peer.host,
peer.tcp_port,
peer.quic_port,
peer.pub_fp,
peer.csk_ver,
peer.tls_cert,
peer.tls_fp,
health_json
])?;
}
}
tx.commit()?;
tracing::debug!(
target: "peer_db",
profile = %profile,
count = peers.len(),
"saved manager peers to database"
);
Ok(())
}
pub fn add_manager_peer(
&mut self,
profile: &str,
peer: ManagerPeerEntry,
) -> Result<(), Report> {
let tx = self.conn.transaction()?;
if !peer.manager_id.starts_with("joining-") {
tx.execute(
"DELETE FROM manager_peers
WHERE profile = ? AND manager_id LIKE 'joining-%'
AND host = ? AND tcp_port = ? AND quic_port = ?
AND tenant = ? AND cluster = ?",
params![
profile,
peer.host,
peer.tcp_port,
peer.quic_port,
peer.tenant,
peer.cluster
],
)?;
}
let health_json = peer
.health
.as_ref()
.and_then(|h| serde_json::to_string(h).ok());
tx.execute(
"INSERT OR REPLACE INTO manager_peers
(profile, manager_id, manager_name, tenant, cluster, host, tcp_port, quic_port,
pub_fp, csk_ver, tls_cert, tls_fp, health_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params![
profile,
peer.manager_id,
peer.manager_name,
peer.tenant,
peer.cluster,
peer.host,
peer.tcp_port,
peer.quic_port,
peer.pub_fp,
peer.csk_ver,
peer.tls_cert,
peer.tls_fp,
health_json
],
)?;
tx.commit()?;
tracing::debug!(
target: "peer_db",
profile = %profile,
manager_id = %peer.manager_id,
"added/updated manager peer in database"
);
Ok(())
}
pub fn remove_manager_peer(
&mut self,
profile: &str,
manager_id: &str,
tenant: &str,
cluster: &str,
) -> Result<bool, Report> {
let rows_affected = self.conn.execute(
"DELETE FROM manager_peers
WHERE profile = ? AND manager_id = ? AND tenant = ? AND cluster = ?",
params![profile, manager_id, tenant, cluster],
)?;
let removed = rows_affected > 0;
if removed {
tracing::debug!(
target: "peer_db",
profile = %profile,
manager_id = %manager_id,
"removed manager peer from database"
);
}
Ok(removed)
}
pub fn load_manager_peers_for_gossip(
&self,
profile: &str,
) -> Result<Vec<ManagerPeerEntry>, Report> {
let mut stmt = self.conn.prepare(
"SELECT manager_id, manager_name, tenant, cluster, host, tcp_port, quic_port,
pub_fp, csk_ver, tls_cert, tls_fp, health_json
FROM manager_peers
WHERE profile = ? AND manager_id NOT LIKE 'joining-%'
ORDER BY manager_id",
)?;
let peer_iter = stmt.query_map([profile], |row| {
let health_json: Option<String> = row.get(11)?;
let health = match health_json {
Some(json) => serde_json::from_str(&json).ok(),
None => None,
};
Ok(ManagerPeerEntry {
manager_id: row.get(0)?,
manager_name: row.get(1)?,
tenant: row.get(2)?,
cluster: row.get(3)?,
host: row.get(4)?,
tcp_port: row.get(5)?,
quic_port: row.get(6)?,
pub_fp: row.get(7)?,
csk_ver: row.get(8)?,
tls_cert: row.get(9)?,
tls_fp: row.get(10)?,
health,
})
})?;
let mut peers = Vec::new();
for peer in peer_iter {
peers.push(peer?);
}
Ok(peers)
}
pub fn load_worker_peers(&self, profile: &str) -> Result<(Vec<WorkerPeerEntry>, u64), Report> {
let mut stmt = self.conn.prepare(
"SELECT host, quic_port, tcp_port, last_ok, last_fail, last_seen_in_gossip, tls_cert, tls_fp,
manager_name, manager_peer_json
FROM worker_peers
WHERE profile = ?
ORDER BY host, quic_port",
)?;
let peer_iter = stmt.query_map([profile], |row| {
let manager_peer_json: Option<String> = row.get(9)?;
let manager_peer = match manager_peer_json {
Some(json) => serde_json::from_str(&json).ok(),
None => None,
};
Ok(WorkerPeerEntry {
host: row.get(0)?,
quic_port: row.get(1)?,
tcp_port: row.get(2)?,
last_ok: opt_u64_from_i64(row.get::<_, Option<i64>>(3)?),
last_fail: opt_u64_from_i64(row.get::<_, Option<i64>>(4)?),
last_seen_in_gossip: opt_u64_from_i64(row.get::<_, Option<i64>>(5)?),
tls_cert: row.get(6)?,
tls_fp: row.get(7)?,
manager_name: row.get(8)?,
manager_peer,
})
})?;
let mut peers = Vec::new();
for peer in peer_iter {
peers.push(peer?);
}
let version = self.get_peer_version(profile, "worker")?;
tracing::debug!(
target: "peer_db",
profile = %profile,
count = peers.len(),
version = version,
"loaded worker peers from database"
);
Ok((peers, version))
}
pub fn save_worker_peers(
&mut self,
profile: &str,
peers: &[WorkerPeerEntry],
version: u64,
) -> Result<(), Report> {
let tx = self.conn.transaction()?;
tx.execute(
"DELETE FROM worker_peers WHERE profile = ?",
params![profile],
)?;
{
let mut stmt = tx.prepare(
"INSERT INTO worker_peers
(profile, host, quic_port, tcp_port, last_ok, last_fail, last_seen_in_gossip, tls_cert, tls_fp,
manager_name, manager_peer_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)?;
for peer in peers {
let manager_peer_json = peer
.manager_peer
.as_ref()
.and_then(|mp| serde_json::to_string(mp).ok());
stmt.execute(params![
profile,
peer.host,
peer.quic_port,
peer.tcp_port,
opt_i64_from_u64(peer.last_ok),
opt_i64_from_u64(peer.last_fail),
opt_i64_from_u64(peer.last_seen_in_gossip),
peer.tls_cert,
peer.tls_fp,
peer.manager_name,
manager_peer_json,
])?;
}
}
Self::set_peer_version_tx(&tx, profile, "worker", version)?;
tx.commit()?;
tracing::debug!(
target: "peer_db",
profile = %profile,
count = peers.len(),
version = version,
"saved worker peers to database"
);
Ok(())
}
pub fn add_worker_peer(&mut self, profile: &str, peer: WorkerPeerEntry) -> Result<(), Report> {
let manager_peer_json = peer
.manager_peer
.as_ref()
.and_then(|mp| serde_json::to_string(mp).ok());
let rows_affected = self.conn.execute(
"INSERT OR IGNORE INTO worker_peers
(profile, host, quic_port, tcp_port, last_ok, last_fail, tls_cert, tls_fp,
manager_name, manager_peer_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params![
profile,
peer.host,
peer.quic_port,
peer.tcp_port,
opt_i64_from_u64(peer.last_ok),
opt_i64_from_u64(peer.last_fail),
peer.tls_cert,
peer.tls_fp,
peer.manager_name,
manager_peer_json
],
)?;
if rows_affected > 0 {
tracing::debug!(
target: "peer_db",
profile = %profile,
host = %peer.host,
quic_port = peer.quic_port,
"added worker peer to database"
);
}
Ok(())
}
pub fn get_peer_version(&self, profile: &str, role: &str) -> Result<u64, Report> {
let version: i64 = self
.conn
.query_row(
"SELECT version FROM peer_versions WHERE profile = ? AND role = ?",
params![profile, role],
|row| row.get(0),
)
.optional()?
.unwrap_or(0);
Ok(u64_from_i64(version))
}
pub fn set_peer_version(
&mut self,
profile: &str,
role: &str,
version: u64,
) -> Result<(), Report> {
let version = i64::try_from(version).unwrap_or(i64::MAX);
self.conn.execute(
"INSERT OR REPLACE INTO peer_versions (profile, role, version) VALUES (?, ?, ?)",
params![profile, role, version],
)?;
Ok(())
}
fn set_peer_version_tx(
tx: &Transaction,
profile: &str,
role: &str,
version: u64,
) -> Result<(), Report> {
let version = i64::try_from(version).unwrap_or(i64::MAX);
tx.execute(
"INSERT OR REPLACE INTO peer_versions (profile, role, version) VALUES (?, ?, ?)",
params![profile, role, version],
)?;
Ok(())
}
pub fn increment_peer_version(&mut self, profile: &str, role: &str) -> Result<u64, Report> {
let current_version = self.get_peer_version(profile, role)?;
let new_version = current_version + 1;
self.set_peer_version(profile, role, new_version)?;
Ok(new_version)
}
pub fn get_db_size(&self) -> Result<u64, Report> {
let size: i64 = self.conn.query_row(
"SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size()",
[],
|row| row.get(0),
)?;
Ok(size as u64)
}
pub fn optimize(&mut self) -> Result<(), Report> {
self.conn.execute("VACUUM", [])?;
self.conn.execute("ANALYZE", [])?;
Ok(())
}
}
type DbKey = (String, String); type DbMap = HashMap<DbKey, Arc<Mutex<PeerDatabase>>>;
static DB_POOL: OnceLock<Arc<Mutex<DbMap>>> = OnceLock::new();
pub fn get_shared_database(role: &str, profile: &str) -> Result<Arc<Mutex<PeerDatabase>>, Report> {
let pool = DB_POOL.get_or_init(|| Arc::new(Mutex::new(HashMap::new())));
let mut guard = pool.lock().expect("db pool poisoned");
let key: DbKey = (role.to_string(), profile.to_string());
if let Some(db) = guard.get(&key) {
return Ok(db.clone());
}
let db_path: PathBuf = core_profile::profile_dir(role, profile).join("peers.db");
let db = PeerDatabase::open(db_path)?;
let db_arc = Arc::new(Mutex::new(db));
guard.insert(key, db_arc.clone());
Ok(db_arc)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::HealthMetrics;
use tempfile::tempdir;
fn create_test_manager_peer() -> ManagerPeerEntry {
ManagerPeerEntry {
manager_id: "test-manager-1".to_string(),
manager_name: "Test Manager".to_string(),
tenant: "test-tenant".to_string(),
cluster: "test-cluster".to_string(),
host: "127.0.0.1".to_string(),
tcp_port: 8080,
quic_port: 8081,
pub_fp: "test-fp".to_string(),
csk_ver: 1,
tls_cert: "test-cert".to_string(),
tls_fp: "test-tls-fp".to_string(),
health: Some(HealthMetrics {
health_score: 0.95,
load_percentage: 45.0,
max_workers: Some(100),
current_workers: 25,
avg_cpu: Some(30.5),
avg_memory: Some(60.2),
last_health_update: 1634567890,
}),
}
}
fn create_test_worker_peer() -> WorkerPeerEntry {
WorkerPeerEntry {
host: "192.168.1.100".to_string(),
quic_port: 9090,
tcp_port: 9091,
last_ok: Some(1634567890),
last_fail: None,
last_seen_in_gossip: None,
tls_cert: "worker-cert".to_string(),
tls_fp: "worker-fp".to_string(),
manager_name: "test-manager".to_string(),
manager_peer: None,
}
}
#[test]
fn test_database_creation_and_migration() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let db = PeerDatabase::open(db_path).unwrap();
let table_count: i32 = db.conn.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name IN ('manager_peers', 'worker_peers', 'peer_versions')",
[],
|row| row.get(0)
).unwrap();
assert_eq!(table_count, 3);
}
#[test]
fn test_manager_peer_operations() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let mut db = PeerDatabase::open(db_path).unwrap();
let peer = create_test_manager_peer();
let profile = "test-profile";
db.add_manager_peer(profile, peer.clone()).unwrap();
let loaded_peers = db.load_manager_peers(profile).unwrap();
assert_eq!(loaded_peers.len(), 1);
assert_eq!(loaded_peers[0].manager_id, peer.manager_id);
assert_eq!(loaded_peers[0].health, peer.health);
let removed = db
.remove_manager_peer(profile, &peer.manager_id, &peer.tenant, &peer.cluster)
.unwrap();
assert!(removed);
let loaded_peers = db.load_manager_peers(profile).unwrap();
assert_eq!(loaded_peers.len(), 0);
}
#[test]
fn test_worker_peer_operations() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let mut db = PeerDatabase::open(db_path).unwrap();
let peer = create_test_worker_peer();
let profile = "test-profile";
db.save_worker_peers(profile, std::slice::from_ref(&peer), 42)
.unwrap();
let (loaded_peers, version) = db.load_worker_peers(profile).unwrap();
assert_eq!(loaded_peers.len(), 1);
assert_eq!(loaded_peers[0].host, peer.host);
assert_eq!(version, 42);
db.add_worker_peer(profile, peer.clone()).unwrap();
let (loaded_peers, _) = db.load_worker_peers(profile).unwrap();
assert_eq!(loaded_peers.len(), 1);
}
#[test]
fn test_peer_versioning() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let mut db = PeerDatabase::open(db_path).unwrap();
let profile = "test-profile";
let version = db.get_peer_version(profile, "manager").unwrap();
assert_eq!(version, 0);
db.set_peer_version(profile, "manager", 5).unwrap();
let version = db.get_peer_version(profile, "manager").unwrap();
assert_eq!(version, 5);
let new_version = db.increment_peer_version(profile, "manager").unwrap();
assert_eq!(new_version, 6);
}
#[test]
fn test_joining_peer_replacement() {
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.db");
let mut db = PeerDatabase::open(db_path).unwrap();
let profile = "test-profile";
let mut joining_peer = create_test_manager_peer();
joining_peer.manager_id = "joining-temp-123".to_string();
db.add_manager_peer(profile, joining_peer.clone()).unwrap();
let mut real_peer = create_test_manager_peer();
real_peer.manager_id = "real-manager-123".to_string();
db.add_manager_peer(profile, real_peer.clone()).unwrap();
let peers = db.load_manager_peers(profile).unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers[0].manager_id, "real-manager-123");
let gossip_peers = db.load_manager_peers_for_gossip(profile).unwrap();
assert_eq!(gossip_peers.len(), 1);
assert_eq!(gossip_peers[0].manager_id, "real-manager-123");
}
}