volli-manager 0.1.12

Manager for volli
Documentation
//! Join functionality for manager-to-manager connections.
//!
//! This module consolidates both client-side and server-side join logic
//! to make it easier to compare and maintain the join exchange protocol.

use crate::bootstrap::{SigningInit, init_cert};
use crate::config::ServerConfigOpts;
use crate::connection::{ManagerContext, mesh::PeerDialRequest};
use crate::keys::secret_dir;
use crate::peers::update_alive;
use crate::{ManagerPeerEntry, add_peer, update_profile};
use base64::Engine;
use eyre::Report;
use sha2::{Digest, Sha256};
use std::sync::atomic::Ordering;
use tracing::info;
use volli_core::token::{decode_token, verify_token};
use volli_core::{Message, WorkerConfig};
use volli_transport::MessageTransportExt;
use volli_transport::Transport;

/// Configuration for the join process
pub struct JoinConfig<'a> {
    pub advertise_host: &'a str,
    pub tcp_port: u16,
    pub quic_port: u16,
    pub manager_name: Option<&'a str>,
    pub bind_host: &'a str,
}

/// Client-side join functionality.
///
/// Connects to an existing manager using a bootstrap token and exchanges
/// credentials to join the cluster mesh.
pub async fn join_as_client(
    join_peer: &ManagerPeerEntry,
    token: &str,
    profile: &str,
    config: &JoinConfig<'_>,
) -> Result<ManagerPeerEntry, Report> {
    info!(
        "Attempting to join via manager {} at {}:{}",
        join_peer.manager_name, join_peer.host, join_peer.quic_port
    );

    let join_cfg = WorkerConfig {
        role: volli_core::Role::Manager,
        host: join_peer.host.clone(),
        tcp_port: join_peer.tcp_port,
        quic_port: join_peer.quic_port,
        token: token.to_string(),
        cert: base64::engine::general_purpose::STANDARD_NO_PAD
            .decode(&join_peer.tls_cert)
            .map_err(|e| eyre::eyre!("Failed to decode join target certificate: {}", e))?,
        fingerprint: join_peer.tls_fp.clone(),
        ..Default::default()
    };

    let mut transport = crate::connection::mesh::try_connect_with_fallback(&join_cfg, &None)
        .await
        .map_err(|e| eyre::eyre!("Failed to connect for join: {}", e))?
        .0;

    transport
        .send(&Message::Join {
            token: token.to_string(),
        })
        .await?;

    info!("Awaiting join response");
    match transport.recv().await? {
        Some(Message::JoinResponse {
            ver,
            csk: new_csk,
            peer,
        }) => {
            info!("Received join response with cluster key version {}", ver);

            // Save cluster key using builder pattern for consistency
            if let Err(e) = update_profile(profile)?
                .cluster_key(&new_csk, ver, true)
                .save_all()
            {
                tracing::error!("Failed to save cluster key: {}", e);
                return Err(e);
            }

            // Now that join succeeded, generate our real peer metadata and send Announce
            let secret_dir = secret_dir(Some(profile));
            std::fs::create_dir_all(&secret_dir)?;

            let mut show_bootstrap = false;
            let SigningInit {
                id: manager_id,
                key: signing_key,
                newly_generated: persist_keys,
                sk_path,
                pk_path,
                ..
            } = crate::bootstrap::init_signing(secret_dir.as_path(), &mut show_bootstrap)?;

            let cert_init = init_cert(
                &ServerConfigOpts {
                    advertise_host: config.advertise_host.to_string(),
                    tcp_port: config.tcp_port,
                    quic_port: config.quic_port,
                    ..Default::default()
                },
                secret_dir.as_path(),
            )?;

            let manager_name = config
                .manager_name
                .map(|s| s.to_string())
                .unwrap_or_else(|| {
                    volli_core::namegen::random_profile()
                        .unwrap_or_else(|_| "joining-manager".to_string())
                });

            // Decode token to get tenant/cluster info
            let token_dec = volli_core::token::decode_token(token)?;

            let self_peer = ManagerPeerEntry {
                manager_id: manager_id.clone(),
                manager_name: manager_name.clone(),
                tenant: token_dec.payload.tenant.clone(),
                cluster: token_dec.payload.cluster.clone(),
                host: config.advertise_host.to_string(),
                tcp_port: config.tcp_port,
                quic_port: config.quic_port,
                pub_fp: hex::encode(&cert_init.fingerprint),
                csk_ver: ver,
                tls_cert: base64::engine::general_purpose::STANDARD_NO_PAD
                    .encode(&cert_init.cert_der),
                tls_fp: hex::encode(Sha256::digest(&cert_init.cert_der)),
                health: None, // Health metrics will be populated later
            };

            // Use unified profile builder to persist all data in batched operations
            update_profile(profile)?
                .name(&manager_name)
                .host(config.advertise_host)
                .bind_host(config.bind_host)
                .tcp_port(config.tcp_port)
                .quic_port(config.quic_port)
                .signing_key(&signing_key, persist_keys, sk_path, pk_path)
                .certificate(
                    &cert_init.cert_der,
                    &cert_init.key_der,
                    cert_init.newly_generated,
                    cert_init.cert_path.clone(),
                    cert_init.key_path.clone(),
                )
                .cluster_key(&new_csk, ver, true) // Always persist CSK during join
                .secret_dir(secret_dir)
                .save_all()?;

            // Send Announce message with our peer metadata
            transport
                .send(&Message::Announce {
                    meta: Box::new(self_peer.clone()),
                    version: 0,      // Initial version
                    peers: vec![],   // Empty on first announce
                    workers: vec![], // Empty on first announce
                })
                .await?;

            // Flush to ensure Announce message is transmitted before connection closes
            transport.flush().await?;

            // Wait for acknowledgment that the announcement was processed
            match transport.recv().await? {
                Some(Message::AuthOk) => {
                    tracing::debug!("Received acknowledgment for join announcement");
                }
                Some(other) => {
                    tracing::warn!("Expected acknowledgment, got: {:?}", other);
                }
                None => {
                    tracing::warn!("Connection closed before receiving acknowledgment");
                }
            }

            info!(
                "Join process completed successfully, sent self announcement: {}",
                manager_id
            );
            info!("Discovered real peer: {}", peer.manager_id);
            // Persist the join target as a known peer immediately so that any
            // workers connecting to this manager can receive full peer lists
            // even before the mesh finishes exchanging gossip.
            if let Err(e) = add_peer(profile, (*peer).clone()) {
                tracing::warn!("Failed to add join peer to storage: {}", e);
            }
            Ok(*peer)
        }
        Some(_) => {
            tracing::info!("Received unexpected message");
            Err(eyre::eyre!("Did not receive expected join response"))
        }
        _ => Err(eyre::eyre!("Did not receive expected join response")),
    }
}

/// Server-side join handling functionality.
///
/// Handles an incoming join request, validates the token, sends the cluster key,
/// and processes the joining manager's announcement.
pub async fn join_as_server(
    ctx: &ManagerContext,
    mut transport: Box<dyn Transport>,
    peer: std::net::SocketAddr,
    token: String,
) -> Result<(), Report> {
    let csk = *ctx.security.csk.read().await;
    let csk_ver = ctx.security.csk_ver.load(Ordering::SeqCst);
    let whitelist = ctx.network.manager_nets.clone();

    // Join connection - authenticate and send JoinResponse
    let token_dec = match decode_token(&token) {
        Ok(t) => t,
        Err(e) => {
            tracing::error!("Join token decode failed for peer {}: {}", peer, e);
            transport.send(&Message::AuthErr).await.ok();
            return Ok(());
        }
    };

    if token_dec.payload.tenant != ctx.state.self_meta.tenant
        || token_dec.payload.cluster != ctx.state.self_meta.cluster
    {
        tracing::warn!(
            %peer,
            token_tenant = %token_dec.payload.tenant,
            token_cluster = %token_dec.payload.cluster,
            expected_tenant = %ctx.state.self_meta.tenant,
            expected_cluster = %ctx.state.self_meta.cluster,
            "join token tenant/cluster mismatch"
        );
        transport.send(&Message::AuthErr).await.ok();
        return Ok(());
    }

    // Check whitelist
    if !whitelist.is_empty() {
        let peer_addr = peer.ip();

        if !whitelist.iter().any(|net| net.contains(&peer_addr)) {
            tracing::warn!(%peer, "join connection from non-whitelisted address");
            transport.send(&Message::AuthErr).await.ok();
            return Ok(());
        }
    }

    if let Err(e) = verify_token(&token_dec, &csk) {
        tracing::error!("Join token verification failed for peer {}: {}", peer, e);
        transport.send(&Message::AuthErr).await.ok();
        return Err(e);
    }

    info!(target: "connection", %peer, "join connection established");

    if let Err(e) = transport
        .send(&Message::JoinResponse {
            ver: csk_ver,
            csk,
            peer: Box::new(ctx.state.self_meta.clone()),
        })
        .await
    {
        tracing::error!("Failed to send join response to {}: {}", peer, e);
        return Err(e.into());
    }

    tracing::debug!("Join exchange completed, CSK provided to joining peer");
    // Flush transport to ensure JoinResponse is transmitted before connection closes
    if let Err(e) = transport.flush().await {
        tracing::warn!("Failed to flush transport after join response: {}", e);
    }

    // Wait for joining manager to send Announce with their peer metadata
    tracing::debug!("Waiting for joining manager to announce itself: {}", peer);
    match transport.recv().await {
        Ok(Some(Message::Announce { meta, .. })) => {
            let joining_peer = *meta;
            tracing::info!(
                "Received peer announcement from joining manager: {} ({}:{})",
                joining_peer.manager_id,
                joining_peer.host,
                joining_peer.quic_port
            );

            // Add joining peer to our peer list and enqueue for dial
            if let Err(e) = add_peer(&ctx.communication.profile, joining_peer.clone()) {
                tracing::warn!("Failed to add joining peer to storage: {}", e);
            }

            // Update the in-memory peer table and increment version counter
            update_alive(
                &ctx.state.peers,
                &ctx.communication.alive_tx,
                &ctx.communication.profile,
                joining_peer.clone(),
                &ctx.state.peer_version,
            )
            .await;

            // Enqueue the joining peer for dial to establish mesh connection
            if ctx
                .communication
                .dial_tx
                .send(PeerDialRequest {
                    peer: joining_peer.clone(),
                    gossip_hint: false,
                })
                .is_err()
            {
                tracing::warn!(
                    "Failed to enqueue joining peer for dial: {}",
                    joining_peer.manager_id
                );
            } else {
                tracing::debug!(
                    "Enqueued joining peer for mesh connection: {} ({}:{})",
                    joining_peer.manager_id,
                    joining_peer.host,
                    joining_peer.quic_port
                );
            }

            // Send acknowledgment to confirm we processed the announcement
            if let Err(e) = transport.send(&Message::AuthOk).await {
                tracing::warn!("Failed to send join announcement acknowledgment: {}", e);
            } else {
                tracing::debug!("Sent acknowledgment for join announcement");
            }
        }
        Ok(Some(msg)) => {
            tracing::warn!("Expected Announce from joining client, got: {:?}", msg);
        }
        Ok(None) => {
            tracing::debug!(
                "Join client closed connection before announcing itself: {}",
                peer
            );
        }
        Err(_) => {
            tracing::debug!(
                "Join client disconnected before announcing itself: {}",
                peer
            );
        }
    }

    // Wait for client to close the connection gracefully after announcement
    tracing::debug!("Waiting for join client to close connection: {}", peer);
    match transport.recv().await {
        Ok(None) => {
            tracing::debug!("Join client closed connection cleanly: {}", peer);
        }
        Ok(Some(msg)) => {
            tracing::warn!(
                "Unexpected message from join client after announce: {:?}",
                msg
            );
        }
        Err(_) => {
            tracing::debug!("Join client disconnected: {}", peer);
        }
    }
    Ok(())
}