volli-manager 0.1.12

Manager for volli
Documentation
//! SQLite-based peer management for manager nodes
//!
//! This module provides peer management using SQLite for excellent
//! performance, concurrency, and atomicity.

use eyre::Report;
use std::sync::{Arc, Mutex};
use volli_core::{ManagerPeerEntry, peer_db};

/// Get or create a database connection for the given profile (shared pool in volli-core)
fn get_database(profile: &str) -> Result<Arc<Mutex<peer_db::PeerDatabase>>, Report> {
    // Manager DBs live under role=manager
    peer_db::get_shared_database("manager", profile)
}

/// Load all manager peers for a given profile
pub fn load_peers(profile: &str) -> Result<Vec<ManagerPeerEntry>, Report> {
    let db = get_database(profile)?;
    let db = db.lock().unwrap();

    tracing::debug!(target: "peer_load", profile=%profile, "loading peers from database");

    let peers = db.load_manager_peers(profile)?;

    tracing::debug!(target: "peer_load", profile=%profile, peer_count=%peers.len(), "loaded peers from database");

    Ok(peers)
}

/// Save all manager peers for a given profile (replaces existing peers)
pub fn save_peers(profile: &str, peers: &[ManagerPeerEntry]) -> Result<(), Report> {
    let db = get_database(profile)?;
    let mut db = db.lock().unwrap();

    tracing::debug!(target: "peer_save", profile=%profile, peer_count=%peers.len(), "saving peers to database");

    db.save_manager_peers(profile, peers)?;

    tracing::debug!(target: "peer_save", profile=%profile, "saved peers to database");

    Ok(())
}

/// Load the current peer version for a profile
pub fn load_peer_version(profile: &str) -> Result<u64, Report> {
    let db = get_database(profile)?;
    let db = db.lock().unwrap();

    db.get_peer_version(profile, "manager")
}

/// Save the peer version for a profile
pub fn save_peer_version(profile: &str, version: u64) -> Result<(), Report> {
    let db = get_database(profile)?;
    let mut db = db.lock().unwrap();

    db.set_peer_version(profile, "manager", version)
}

/// Add or update a single manager peer
pub fn add_peer(profile: &str, peer: ManagerPeerEntry) -> Result<(), Report> {
    let db = get_database(profile)?;
    let mut db = db.lock().unwrap();

    tracing::debug!(target: "peer_add", profile=%profile, manager_id=%peer.manager_id, "adding peer to database");

    db.add_manager_peer(profile, peer)?;

    Ok(())
}

/// Load peers but filter out temporary joining placeholders for gossip/announce messages
pub fn load_peers_for_gossip(profile: &str) -> Result<Vec<ManagerPeerEntry>, Report> {
    // Prefer in-memory cache to avoid DB reads during steady-state, but preserve
    // gossip semantics (exclude joining-* placeholders)
    if let Some(mut peers) = crate::peers::cached_peers_for_profile(profile) {
        peers.retain(|p| !p.manager_id.starts_with("joining-"));
        return Ok(peers);
    }
    let db = get_database(profile)?;
    let db = db.lock().unwrap();

    tracing::debug!(target: "peer_gossip", profile=%profile, "loading peers for gossip from database");

    let mut peers = db.load_manager_peers_for_gossip(profile)?;
    peers.retain(|p| !p.manager_id.starts_with("joining-"));

    tracing::debug!(target: "peer_gossip", profile=%profile, peer_count=%peers.len(), "loaded peers for gossip from database");

    Ok(peers)
}

/// Remove a manager peer
pub fn remove_peer(
    profile: &str,
    manager_id: &str,
    tenant: &str,
    cluster: &str,
) -> Result<(), Report> {
    let db = get_database(profile)?;
    let mut db = db.lock().unwrap();

    tracing::debug!(target: "peer_remove", profile=%profile, manager_id=%manager_id, "removing peer from database");

    let removed = db.remove_manager_peer(profile, manager_id, tenant, cluster)?;

    if removed {
        // Increment version when a peer is actually removed
        let new_version = db.increment_peer_version(profile, "manager")?;
        tracing::debug!(target: "peer_remove", profile=%profile, manager_id=%manager_id, new_version=%new_version, "removed peer and incremented version");
    } else {
        tracing::debug!(target: "peer_remove", profile=%profile, manager_id=%manager_id, "peer not found for removal");
    }

    Ok(())
}

/// Get database statistics for monitoring/debugging
pub fn get_database_stats(profile: &str) -> Result<DatabaseStats, Report> {
    let db = get_database(profile)?;
    let db = db.lock().unwrap();

    let size = db.get_db_size()?;
    let manager_peer_count = db.load_manager_peers(profile)?.len();
    let version = db.get_peer_version(profile, "manager")?;

    Ok(DatabaseStats {
        size_bytes: size,
        manager_peer_count,
        version,
    })
}

/// Optimize the database (vacuum and analyze)
pub fn optimize_database(profile: &str) -> Result<(), Report> {
    let db = get_database(profile)?;
    let mut db = db.lock().unwrap();

    tracing::info!(target: "peer_db", profile=%profile, "optimizing database");

    db.optimize()?;

    tracing::info!(target: "peer_db", profile=%profile, "database optimization complete");

    Ok(())
}

/// Database statistics for monitoring
#[derive(Debug, Clone)]
pub struct DatabaseStats {
    pub size_bytes: u64,
    pub manager_peer_count: usize,
    pub version: u64,
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;
    use volli_core::{HealthMetrics, env_config};

    fn create_test_peer() -> ManagerPeerEntry {
        ManagerPeerEntry {
            manager_id: "test-manager-1".to_string(),
            manager_name: "Test Manager".to_string(),
            tenant: "test-tenant".to_string(),
            cluster: "test-cluster".to_string(),
            host: "127.0.0.1".to_string(),
            tcp_port: 8080,
            quic_port: 8081,
            pub_fp: "test-fp".to_string(),
            csk_ver: 1,
            tls_cert: "test-cert".to_string(),
            tls_fp: "test-tls-fp".to_string(),
            health: Some(HealthMetrics {
                health_score: 0.95,
                load_percentage: 45.0,
                max_workers: Some(100),
                current_workers: 25,
                avg_cpu: Some(30.5),
                avg_memory: Some(60.2),
                last_health_update: 1634567890,
            }),
        }
    }

    #[test]
    fn test_peer_management_operations() {
        let temp_dir = tempdir().unwrap();
        let profile = "test-profile";

        // Override the config dir to use temp directory for testing
        let _guard = env_config::override_config_dir(Some(temp_dir.path()));

        let peer = create_test_peer();

        // Test add peer
        add_peer(profile, peer.clone()).unwrap();

        // Test load peers
        let loaded_peers = load_peers(profile).unwrap();
        assert_eq!(loaded_peers.len(), 1);
        assert_eq!(loaded_peers[0].manager_id, peer.manager_id);

        // Test gossip peers (should exclude joining- peers)
        let gossip_peers = load_peers_for_gossip(profile).unwrap();
        assert_eq!(gossip_peers.len(), 1);

        // Test remove peer
        remove_peer(profile, &peer.manager_id, &peer.tenant, &peer.cluster).unwrap();

        let loaded_peers = load_peers(profile).unwrap();
        assert_eq!(loaded_peers.len(), 0);
    }

    #[test]
    fn test_peer_versioning() {
        let temp_dir = tempdir().unwrap();
        let profile = "test-profile-version";

        let _guard = env_config::override_config_dir(Some(temp_dir.path()));

        // Test initial version
        let version = load_peer_version(profile).unwrap();
        assert_eq!(version, 0);

        // Test save version
        save_peer_version(profile, 5).unwrap();
        let version = load_peer_version(profile).unwrap();
        assert_eq!(version, 5);
    }

    #[test]
    fn test_joining_peer_handling() {
        let temp_dir = tempdir().unwrap();
        let profile = "test-profile-joining";

        let _guard = env_config::override_config_dir(Some(temp_dir.path()));

        // Add a joining peer
        let mut joining_peer = create_test_peer();
        joining_peer.manager_id = "joining-temp-123".to_string();
        add_peer(profile, joining_peer.clone()).unwrap();

        // Verify it's in normal peer list
        let peers = load_peers(profile).unwrap();
        assert_eq!(peers.len(), 1);

        // Verify it's excluded from gossip
        let gossip_peers = load_peers_for_gossip(profile).unwrap();
        assert_eq!(gossip_peers.len(), 0);

        // Add a real peer with same host/port - should replace joining peer
        let mut real_peer = create_test_peer();
        real_peer.manager_id = "real-manager-123".to_string();
        add_peer(profile, real_peer.clone()).unwrap();

        let peers = load_peers(profile).unwrap();
        assert_eq!(peers.len(), 1);
        assert_eq!(peers[0].manager_id, "real-manager-123");
    }

    #[test]
    fn test_database_stats() {
        let temp_dir = tempdir().unwrap();
        let profile = "test-profile-stats";

        let _guard = env_config::override_config_dir(Some(temp_dir.path()));

        let peer = create_test_peer();
        add_peer(profile, peer.clone()).unwrap();
        save_peer_version(profile, 42).unwrap();

        let stats = get_database_stats(profile).unwrap();
        assert_eq!(stats.manager_peer_count, 1);
        assert_eq!(stats.version, 42);
        assert!(stats.size_bytes > 0);
    }
}