use eyre::Report;
use std::sync::{Arc, Mutex};
use volli_core::{ManagerPeerEntry, peer_db};
fn get_database(profile: &str) -> Result<Arc<Mutex<peer_db::PeerDatabase>>, Report> {
peer_db::get_shared_database("manager", profile)
}
pub fn load_peers(profile: &str) -> Result<Vec<ManagerPeerEntry>, Report> {
let db = get_database(profile)?;
let db = db.lock().unwrap();
tracing::debug!(target: "peer_load", profile=%profile, "loading peers from database");
let peers = db.load_manager_peers(profile)?;
tracing::debug!(target: "peer_load", profile=%profile, peer_count=%peers.len(), "loaded peers from database");
Ok(peers)
}
pub fn save_peers(profile: &str, peers: &[ManagerPeerEntry]) -> Result<(), Report> {
let db = get_database(profile)?;
let mut db = db.lock().unwrap();
tracing::debug!(target: "peer_save", profile=%profile, peer_count=%peers.len(), "saving peers to database");
db.save_manager_peers(profile, peers)?;
tracing::debug!(target: "peer_save", profile=%profile, "saved peers to database");
Ok(())
}
pub fn load_peer_version(profile: &str) -> Result<u64, Report> {
let db = get_database(profile)?;
let db = db.lock().unwrap();
db.get_peer_version(profile, "manager")
}
pub fn save_peer_version(profile: &str, version: u64) -> Result<(), Report> {
let db = get_database(profile)?;
let mut db = db.lock().unwrap();
db.set_peer_version(profile, "manager", version)
}
pub fn add_peer(profile: &str, peer: ManagerPeerEntry) -> Result<(), Report> {
let db = get_database(profile)?;
let mut db = db.lock().unwrap();
tracing::debug!(target: "peer_add", profile=%profile, manager_id=%peer.manager_id, "adding peer to database");
db.add_manager_peer(profile, peer)?;
Ok(())
}
pub fn load_peers_for_gossip(profile: &str) -> Result<Vec<ManagerPeerEntry>, Report> {
if let Some(mut peers) = crate::peers::cached_peers_for_profile(profile) {
peers.retain(|p| !p.manager_id.starts_with("joining-"));
return Ok(peers);
}
let db = get_database(profile)?;
let db = db.lock().unwrap();
tracing::debug!(target: "peer_gossip", profile=%profile, "loading peers for gossip from database");
let mut peers = db.load_manager_peers_for_gossip(profile)?;
peers.retain(|p| !p.manager_id.starts_with("joining-"));
tracing::debug!(target: "peer_gossip", profile=%profile, peer_count=%peers.len(), "loaded peers for gossip from database");
Ok(peers)
}
pub fn remove_peer(
profile: &str,
manager_id: &str,
tenant: &str,
cluster: &str,
) -> Result<(), Report> {
let db = get_database(profile)?;
let mut db = db.lock().unwrap();
tracing::debug!(target: "peer_remove", profile=%profile, manager_id=%manager_id, "removing peer from database");
let removed = db.remove_manager_peer(profile, manager_id, tenant, cluster)?;
if removed {
let new_version = db.increment_peer_version(profile, "manager")?;
tracing::debug!(target: "peer_remove", profile=%profile, manager_id=%manager_id, new_version=%new_version, "removed peer and incremented version");
} else {
tracing::debug!(target: "peer_remove", profile=%profile, manager_id=%manager_id, "peer not found for removal");
}
Ok(())
}
pub fn get_database_stats(profile: &str) -> Result<DatabaseStats, Report> {
let db = get_database(profile)?;
let db = db.lock().unwrap();
let size = db.get_db_size()?;
let manager_peer_count = db.load_manager_peers(profile)?.len();
let version = db.get_peer_version(profile, "manager")?;
Ok(DatabaseStats {
size_bytes: size,
manager_peer_count,
version,
})
}
pub fn optimize_database(profile: &str) -> Result<(), Report> {
let db = get_database(profile)?;
let mut db = db.lock().unwrap();
tracing::info!(target: "peer_db", profile=%profile, "optimizing database");
db.optimize()?;
tracing::info!(target: "peer_db", profile=%profile, "database optimization complete");
Ok(())
}
#[derive(Debug, Clone)]
pub struct DatabaseStats {
pub size_bytes: u64,
pub manager_peer_count: usize,
pub version: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
use volli_core::{HealthMetrics, env_config};
fn create_test_peer() -> ManagerPeerEntry {
ManagerPeerEntry {
manager_id: "test-manager-1".to_string(),
manager_name: "Test Manager".to_string(),
tenant: "test-tenant".to_string(),
cluster: "test-cluster".to_string(),
host: "127.0.0.1".to_string(),
tcp_port: 8080,
quic_port: 8081,
pub_fp: "test-fp".to_string(),
csk_ver: 1,
tls_cert: "test-cert".to_string(),
tls_fp: "test-tls-fp".to_string(),
health: Some(HealthMetrics {
health_score: 0.95,
load_percentage: 45.0,
max_workers: Some(100),
current_workers: 25,
avg_cpu: Some(30.5),
avg_memory: Some(60.2),
last_health_update: 1634567890,
}),
}
}
#[test]
fn test_peer_management_operations() {
let temp_dir = tempdir().unwrap();
let profile = "test-profile";
let _guard = env_config::override_config_dir(Some(temp_dir.path()));
let peer = create_test_peer();
add_peer(profile, peer.clone()).unwrap();
let loaded_peers = load_peers(profile).unwrap();
assert_eq!(loaded_peers.len(), 1);
assert_eq!(loaded_peers[0].manager_id, peer.manager_id);
let gossip_peers = load_peers_for_gossip(profile).unwrap();
assert_eq!(gossip_peers.len(), 1);
remove_peer(profile, &peer.manager_id, &peer.tenant, &peer.cluster).unwrap();
let loaded_peers = load_peers(profile).unwrap();
assert_eq!(loaded_peers.len(), 0);
}
#[test]
fn test_peer_versioning() {
let temp_dir = tempdir().unwrap();
let profile = "test-profile-version";
let _guard = env_config::override_config_dir(Some(temp_dir.path()));
let version = load_peer_version(profile).unwrap();
assert_eq!(version, 0);
save_peer_version(profile, 5).unwrap();
let version = load_peer_version(profile).unwrap();
assert_eq!(version, 5);
}
#[test]
fn test_joining_peer_handling() {
let temp_dir = tempdir().unwrap();
let profile = "test-profile-joining";
let _guard = env_config::override_config_dir(Some(temp_dir.path()));
let mut joining_peer = create_test_peer();
joining_peer.manager_id = "joining-temp-123".to_string();
add_peer(profile, joining_peer.clone()).unwrap();
let peers = load_peers(profile).unwrap();
assert_eq!(peers.len(), 1);
let gossip_peers = load_peers_for_gossip(profile).unwrap();
assert_eq!(gossip_peers.len(), 0);
let mut real_peer = create_test_peer();
real_peer.manager_id = "real-manager-123".to_string();
add_peer(profile, real_peer.clone()).unwrap();
let peers = load_peers(profile).unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers[0].manager_id, "real-manager-123");
}
#[test]
fn test_database_stats() {
let temp_dir = tempdir().unwrap();
let profile = "test-profile-stats";
let _guard = env_config::override_config_dir(Some(temp_dir.path()));
let peer = create_test_peer();
add_peer(profile, peer.clone()).unwrap();
save_peer_version(profile, 42).unwrap();
let stats = get_database_stats(profile).unwrap();
assert_eq!(stats.manager_peer_count, 1);
assert_eq!(stats.version, 42);
assert!(stats.size_bytes > 0);
}
}