volli-manager 0.1.12

Manager for volli
Documentation
use super::{HealthContext, ManagerContext, SecurityContext, StateContext};
use crate::{
    ManagerPeerEntry,
    connection::{CommunicationContext, manager::process_peer_update},
};
use std::collections::HashMap;
use std::sync::{
    Arc,
    atomic::{AtomicU32, AtomicU64},
};
use tempfile::TempDir;
use tokio::{
    runtime::Builder,
    sync::{Mutex, RwLock, broadcast},
};
use volli_core::{EnvironmentConfig, Role, WorkerConfig, override_config_dir, override_env_config};

#[test]
fn self_not_added_from_peer_list() {
    let base = TempDir::new().unwrap();
    let _cfg = override_config_dir(Some(base.path()));
    let _env = override_env_config(EnvironmentConfig {
        interval_ms: Some(0),
        backoff_ms: Some(0),
        ..Default::default()
    });

    let rt = Builder::new_current_thread().enable_all().build().unwrap();
    rt.block_on(async {
        let self_meta = ManagerPeerEntry {
            manager_id: "self".into(),
            manager_name: "self".into(),
            tenant: "t".into(),
            cluster: "c".into(),
            host: "127.0.0.1".into(),
            tcp_port: 4242,
            quic_port: 4242,
            pub_fp: "0".repeat(64),
            csk_ver: 0,
            tls_cert: String::new(),
            tls_fp: String::new(),
            health: None,
        };
        let peers = Arc::new(Mutex::new(HashMap::new()));
        let (alive_tx, _) = broadcast::channel(8);
        let workers = Arc::new(Mutex::new(HashMap::new()));
        let csk = Arc::new(RwLock::new([0u8; 32]));
        let csk_ver = Arc::new(AtomicU32::new(0));
        let peer_version = Arc::new(AtomicU64::new(0));
        let (dial_tx, _dial_rx) = tokio::sync::mpsc::unbounded_channel();
        let ctx = ManagerContext::new_peer(
            SecurityContext {
                signing: None,
                csk,
                csk_ver,
            },
            StateContext {
                peers: peers.clone(),
                workers,
                self_meta: self_meta.clone(),
                peer_version,
                command_distributor: None,
            },
            CommunicationContext {
                alive_tx,
                dial_tx,
                profile: "profile".into(),
            },
            WorkerConfig {
                host: "127.0.0.1".into(),
                quic_port: 4242,
                tcp_port: 0,
                protocol: None,
                token: "t".into(),
                fingerprint: String::new(),
                cert: Vec::new(),
                role: Role::Manager,
            },
            HealthContext::default(),
        );

        let remote_meta = ManagerPeerEntry {
            manager_id: "remote".into(),
            manager_name: "remote".into(),
            tenant: "t".into(),
            cluster: "c".into(),
            host: "127.0.0.2".into(),
            tcp_port: 0,
            quic_port: 0,
            pub_fp: "0".repeat(64),
            csk_ver: 0,
            tls_cert: String::new(),
            tls_fp: String::new(),
            health: None,
        };
        assert!(
            process_peer_update(&ctx, remote_meta.clone()).await,
            "remote peer should be persisted"
        );
        assert!(
            !process_peer_update(&ctx, self_meta.clone()).await,
            "self entry should be ignored"
        );
    });
}