volli-manager 0.1.12

Manager for volli
Documentation
use super::*;
use crate::connection::handle_manager_as_client;
use crate::keys::load_csk;
use crate::test_harness::{InMemoryHarness, ManagerOptions};
use std::collections::HashMap;
use std::sync::{Arc, atomic::AtomicU32};
use tempfile::TempDir;
use tokio::sync::{Mutex, RwLock, broadcast};
use volli_core::{
    EnvironmentConfig, Role, WorkerConfig, override_env_config, override_env_config_patch,
};
use volli_transport::MemoryTransport;

#[tokio::test(flavor = "current_thread")]
async fn join_receives_cluster_key() {
    let _env_guard = override_env_config(EnvironmentConfig {
        interval_ms: Some(0),
        backoff_ms: Some(0),
        connect_timeout_ms: Some(1),
        auth_timeout_secs: Some(1),
        heartbeat_secs: Some(1),
        ..Default::default()
    });
    let base = TempDir::new().unwrap();
    let _cfg_guard = volli_core::override_config_dir(Some(base.path()));
    let _env_guard = override_env_config_patch(EnvironmentConfig {
        interval_ms: Some(10),
        backoff_ms: Some(10),
        ..Default::default()
    });

    let profile = "client";
    let dir = crate::keys::secret_dir(Some(profile));
    std::fs::create_dir_all(&dir).unwrap();

    let csk = [7u8; 32];
    let csk_ver = 42;
    let tok = volli_core::token::issue_token(&csk, "self", "default", "joiner", 60).unwrap();
    let encoded = volli_core::token::encode_token(&tok).unwrap();

    let (client, mut server) = MemoryTransport::pair();

    let self_meta = ManagerPeerEntry {
        manager_id: "client".into(),
        manager_name: "client".into(),
        tenant: "self".into(),
        cluster: "default".into(),
        host: "127.0.0.1".into(),
        tcp_port: 0,
        quic_port: 0,
        pub_fp: String::new(),
        csk_ver: 0,
        tls_cert: String::new(),
        tls_fp: String::new(),
        health: None,
    };
    let peers = Arc::new(Mutex::new(HashMap::new()));
    let workers = Arc::new(Mutex::new(HashMap::new()));
    let (alive_tx, _) = broadcast::channel(16);
    let cfg = WorkerConfig {
        role: Role::Manager,
        token: encoded.clone(),
        ..Default::default()
    };
    let csk_shared = Arc::new(RwLock::new([0u8; 32]));
    let csk_ver_shared = Arc::new(AtomicU32::new(0));
    let (dial_tx, _dial_rx) = tokio::sync::mpsc::unbounded_channel();

    let ctx = ManagerContext::new_peer(
        connection::SecurityContext {
            signing: None,
            csk: csk_shared.clone(),
            csk_ver: csk_ver_shared.clone(),
        },
        connection::StateContext {
            peers: peers.clone(),
            workers,
            self_meta: self_meta.clone(),
            peer_version: Arc::new(AtomicU64::new(1)),
            command_distributor: None,
        },
        connection::CommunicationContext {
            alive_tx: alive_tx.clone(),
            dial_tx,
            profile: profile.to_string(),
        },
        cfg.clone(),
        connection::HealthContext::default(),
    );

    let client_fut = tokio::spawn(handle_manager_as_client(
        ctx,
        Box::new(client),
        "127.0.0.1".into(),
    ));
    let server_fut = async move {
        match server.recv().await.unwrap() {
            Some(Message::Auth { token, .. }) => {
                assert_eq!(token, encoded);
            }
            other => panic!("expected auth, got {other:?}"),
        }
        server.send(&Message::AuthOk).await.unwrap();

        server
            .send(&Message::ClusterKey { ver: csk_ver, csk })
            .await
            .unwrap();
        match server.recv().await.unwrap() {
            Some(Message::Announce { .. }) => {}
            other => panic!("expected heartbeat, got {other:?}"),
        }
    };

    let _ = tokio::join!(client_fut, server_fut);

    let (saved, ver) = load_csk(profile).unwrap().unwrap();
    assert_eq!(saved, csk);
    assert_eq!(ver, csk_ver);
}

fn make_meta(id: &str) -> ManagerPeerEntry {
    ManagerPeerEntry {
        manager_id: id.into(),
        manager_name: id.into(),
        tenant: "self".into(),
        cluster: "default".into(),
        host: "127.0.0.1".into(),
        tcp_port: 0,
        quic_port: 0,
        pub_fp: String::new(),
        csk_ver: 0,
        tls_cert: String::new(),
        tls_fp: String::new(),
        health: None,
    }
}

#[tokio::test(flavor = "current_thread")]
async fn gossip_propagates_peer_list() {
    let _env_guard = override_env_config(EnvironmentConfig {
        interval_ms: Some(0),
        backoff_ms: Some(0),
        connect_timeout_ms: Some(1),
        auth_timeout_secs: Some(1),
        heartbeat_secs: Some(1),
        ..Default::default()
    });
    let base = TempDir::new().unwrap();
    let _cfg_guard = volli_core::override_config_dir(Some(base.path()));
    let _env_guard = override_env_config_patch(EnvironmentConfig {
        interval_ms: Some(10),
        backoff_ms: Some(10),
        ..Default::default()
    });

    let profile = "node-b";
    let dir = crate::keys::secret_dir(Some(profile));
    std::fs::create_dir_all(&dir).unwrap();

    let csk = [3u8; 32];
    let tok = volli_core::token::issue_token(&csk, "self", "default", "joiner", 60).unwrap();
    let encoded = volli_core::token::encode_token(&tok).unwrap();

    let (client, mut server) = MemoryTransport::pair();

    let self_meta = make_meta("b");
    let peers = Arc::new(Mutex::new(HashMap::new()));
    let workers = Arc::new(Mutex::new(HashMap::new()));
    let (alive_tx, _) = broadcast::channel(16);
    let cfg = WorkerConfig {
        role: Role::Manager,
        token: encoded.clone(),
        ..Default::default()
    };
    let csk_shared = Arc::new(RwLock::new([0u8; 32]));
    let csk_ver_shared = Arc::new(AtomicU32::new(0));
    let (dial_tx, _dial_rx) = tokio::sync::mpsc::unbounded_channel();

    let ctx = ManagerContext::new_peer(
        connection::SecurityContext {
            signing: None,
            csk: csk_shared.clone(),
            csk_ver: csk_ver_shared.clone(),
        },
        connection::StateContext {
            peers: peers.clone(),
            workers,
            self_meta: self_meta.clone(),
            peer_version: Arc::new(AtomicU64::new(1)),
            command_distributor: None,
        },
        connection::CommunicationContext {
            alive_tx: alive_tx.clone(),
            dial_tx,
            profile: profile.to_string(),
        },
        cfg.clone(),
        connection::HealthContext::default(),
    );

    let client_fut = tokio::spawn(handle_manager_as_client(
        ctx,
        Box::new(client),
        "127.0.0.1".into(),
    ));

    let remote_meta = make_meta("a");
    let extra_peer = make_meta("c");
    let mut ver_rx = alive_tx.subscribe();
    let server_fut = async move {
        match server.recv().await.unwrap() {
            Some(Message::Auth { token, .. }) => assert_eq!(token, encoded),
            other => panic!("expected auth, got {other:?}"),
        }
        server.send(&Message::AuthOk).await.unwrap();

        server
            .send(&Message::ClusterKey { ver: 1, csk })
            .await
            .unwrap();
        match server.recv().await.unwrap() {
            Some(Message::Announce { .. }) => {}
            other => panic!("expected announce, got {other:?}"),
        }
        server
            .send(&Message::Announce {
                meta: Box::new(remote_meta),
                version: 1,
                peers: vec![extra_peer],
                workers: Vec::new(),
            })
            .await
            .unwrap();
        let _ = ver_rx.recv().await.unwrap();
    };

    let (client_res, _) = tokio::join!(client_fut, server_fut);
    let _ = client_res;

    let map = peers.lock().await;
    assert!(map.contains_key("self:default:a"));
    assert!(map.contains_key("self:default:c"));
}

#[tokio::test(flavor = "current_thread")]
async fn handshake_succeeds_over_memory_transport() {
    let harness = InMemoryHarness::new("tenant", "fleet");
    let manager = harness.manager("manager-handshake");
    let worker = harness.worker("worker-1");
    worker.connect(&manager).shutdown().await;
}

#[tokio::test(flavor = "current_thread")]
async fn worker_whitelist_enforced() {
    let opts = ManagerOptions {
        worker_whitelist: vec!["127.0.0.1/32".parse().unwrap()],
        ..Default::default()
    };
    let harness = InMemoryHarness::new("tenant", "fleet");
    let manager = harness.manager_with_options("manager-allow", opts);
    let worker = harness.worker("worker-allow");
    worker.connect(&manager).shutdown().await;
}

#[tokio::test(flavor = "current_thread")]
async fn worker_whitelist_rejects() {
    let opts = ManagerOptions {
        worker_whitelist: vec!["10.0.0.0/8".parse().unwrap()],
        ..Default::default()
    };
    let harness = InMemoryHarness::new("tenant", "fleet");
    let manager = harness.manager_with_options("manager-deny", opts);
    let token = harness.worker_token("worker-deny");

    let (mut client, server) = MemoryTransport::pair();
    let addr = std::net::SocketAddr::new(std::net::IpAddr::from([192, 168, 0, 10]), 0);
    let server_task = manager.spawn_worker_task_with_addr(server, addr);

    let _ = client
        .send(&Message::Auth {
            token,
            worker_id: None,
            worker_name: None,
        })
        .await;
    drop(client);
    let _ = server_task
        .await
        .unwrap()
        .expect_err("connection should be rejected");
}

mod health_integration {
    use crate::health::{HealthCollector, HealthConfig};
    use crate::workers::{WorkerTable, list_workers, update_worker};
    use std::collections::HashMap;
    use std::sync::Arc;
    use std::time::Duration;
    use tokio::sync::{Mutex, broadcast};
    use volli_core::{ConfigGuard, EnvironmentConfig, WorkerEntry, override_env_config};

    fn fast_env() -> ConfigGuard {
        override_env_config(EnvironmentConfig {
            interval_ms: Some(0),
            backoff_ms: Some(0),
            connect_timeout_ms: Some(10),
            auth_timeout_secs: Some(1),
            heartbeat_secs: Some(1),
            ..Default::default()
        })
    }

    fn create_test_health_collector(
        max_workers: Option<u32>,
        metric_collection_interval: Duration,
    ) -> HealthCollector {
        let config = HealthConfig {
            max_workers,
            metric_collection_interval,
            sample_window_size: 3,
            cpu_threshold: 80.0,
            memory_threshold: 85.0,
        };
        HealthCollector::with_fixed_metrics(config, Some(10.0), Some(20.0))
    }

    #[tokio::test(flavor = "current_thread")]
    async fn test_health_collector_basic_functionality() {
        let _env = fast_env();
        let mut collector = create_test_health_collector(Some(100), Duration::from_millis(100));
        let initial_metrics = collector.collect_metrics().await;
        assert_eq!(initial_metrics.current_workers, 0);
        assert!(initial_metrics.health_score >= 0.0 && initial_metrics.health_score <= 1.0);
        assert_eq!(initial_metrics.load_percentage, 0.0);
        assert_eq!(initial_metrics.max_workers, Some(100));

        assert!(collector.can_accept_worker());
        collector.increment_worker_count();
        let metrics_after_increment = collector.collect_metrics().await;
        assert_eq!(metrics_after_increment.current_workers, 1);
        assert_eq!(metrics_after_increment.load_percentage, 1.0);

        collector.decrement_worker_count();
        let metrics_after_decrement = collector.collect_metrics().await;
        assert_eq!(metrics_after_decrement.current_workers, 0);
        assert_eq!(metrics_after_decrement.load_percentage, 0.0);
    }

    #[tokio::test(flavor = "current_thread")]
    async fn test_health_integration_with_worker_table() {
        let _env = fast_env();
        let table: WorkerTable = Arc::new(Mutex::new(HashMap::new()));
        let (tx, _rx) = broadcast::channel(1);
        let version = Arc::new(std::sync::atomic::AtomicU64::new(1));
        let mut collector = create_test_health_collector(Some(100), Duration::from_millis(100));

        for i in 0..5 {
            update_worker(
                &table,
                &tx,
                WorkerEntry {
                    worker_id: format!("worker_{}", i),
                    manager_id: "test_manager".to_string(),
                    worker_name: None,
                    last_seen: Some(1000 + i),
                    connected_since: Some(1000 + i),
                    disconnected_at: None,
                },
                &version,
            )
            .await;
            collector.increment_worker_count();
        }

        let workers = list_workers(&table).await;
        let health_metrics = collector.collect_metrics().await;
        assert_eq!(workers.len(), 5);
        assert_eq!(health_metrics.current_workers, 5);

        for _ in 0..2 {
            collector.decrement_worker_count();
        }
        let updated_metrics = collector.collect_metrics().await;
        assert_eq!(updated_metrics.current_workers, 3);
    }
}