use crate::bootstrap::{SigningInit, init_cert};
use crate::config::ServerConfigOpts;
use crate::connection::{ManagerContext, mesh::PeerDialRequest};
use crate::keys::secret_dir;
use crate::peers::update_alive;
use crate::{ManagerPeerEntry, add_peer, update_profile};
use base64::Engine;
use eyre::Report;
use sha2::{Digest, Sha256};
use std::sync::atomic::Ordering;
use tracing::info;
use volli_core::token::{decode_token, verify_token};
use volli_core::{Message, WorkerConfig};
use volli_transport::MessageTransportExt;
use volli_transport::Transport;
pub struct JoinConfig<'a> {
pub advertise_host: &'a str,
pub tcp_port: u16,
pub quic_port: u16,
pub manager_name: Option<&'a str>,
pub bind_host: &'a str,
}
pub async fn join_as_client(
join_peer: &ManagerPeerEntry,
token: &str,
profile: &str,
config: &JoinConfig<'_>,
) -> Result<ManagerPeerEntry, Report> {
info!(
"Attempting to join via manager {} at {}:{}",
join_peer.manager_name, join_peer.host, join_peer.quic_port
);
let join_cfg = WorkerConfig {
role: volli_core::Role::Manager,
host: join_peer.host.clone(),
tcp_port: join_peer.tcp_port,
quic_port: join_peer.quic_port,
token: token.to_string(),
cert: base64::engine::general_purpose::STANDARD_NO_PAD
.decode(&join_peer.tls_cert)
.map_err(|e| eyre::eyre!("Failed to decode join target certificate: {}", e))?,
fingerprint: join_peer.tls_fp.clone(),
..Default::default()
};
let mut transport = crate::connection::mesh::try_connect_with_fallback(&join_cfg, &None)
.await
.map_err(|e| eyre::eyre!("Failed to connect for join: {}", e))?
.0;
transport
.send(&Message::Join {
token: token.to_string(),
})
.await?;
info!("Awaiting join response");
match transport.recv().await? {
Some(Message::JoinResponse {
ver,
csk: new_csk,
peer,
}) => {
info!("Received join response with cluster key version {}", ver);
if let Err(e) = update_profile(profile)?
.cluster_key(&new_csk, ver, true)
.save_all()
{
tracing::error!("Failed to save cluster key: {}", e);
return Err(e);
}
let secret_dir = secret_dir(Some(profile));
std::fs::create_dir_all(&secret_dir)?;
let mut show_bootstrap = false;
let SigningInit {
id: manager_id,
key: signing_key,
newly_generated: persist_keys,
sk_path,
pk_path,
..
} = crate::bootstrap::init_signing(secret_dir.as_path(), &mut show_bootstrap)?;
let cert_init = init_cert(
&ServerConfigOpts {
advertise_host: config.advertise_host.to_string(),
tcp_port: config.tcp_port,
quic_port: config.quic_port,
..Default::default()
},
secret_dir.as_path(),
)?;
let manager_name = config
.manager_name
.map(|s| s.to_string())
.unwrap_or_else(|| {
volli_core::namegen::random_profile()
.unwrap_or_else(|_| "joining-manager".to_string())
});
let token_dec = volli_core::token::decode_token(token)?;
let self_peer = ManagerPeerEntry {
manager_id: manager_id.clone(),
manager_name: manager_name.clone(),
tenant: token_dec.payload.tenant.clone(),
cluster: token_dec.payload.cluster.clone(),
host: config.advertise_host.to_string(),
tcp_port: config.tcp_port,
quic_port: config.quic_port,
pub_fp: hex::encode(&cert_init.fingerprint),
csk_ver: ver,
tls_cert: base64::engine::general_purpose::STANDARD_NO_PAD
.encode(&cert_init.cert_der),
tls_fp: hex::encode(Sha256::digest(&cert_init.cert_der)),
health: None, };
update_profile(profile)?
.name(&manager_name)
.host(config.advertise_host)
.bind_host(config.bind_host)
.tcp_port(config.tcp_port)
.quic_port(config.quic_port)
.signing_key(&signing_key, persist_keys, sk_path, pk_path)
.certificate(
&cert_init.cert_der,
&cert_init.key_der,
cert_init.newly_generated,
cert_init.cert_path.clone(),
cert_init.key_path.clone(),
)
.cluster_key(&new_csk, ver, true) .secret_dir(secret_dir)
.save_all()?;
transport
.send(&Message::Announce {
meta: Box::new(self_peer.clone()),
version: 0, peers: vec![], workers: vec![], })
.await?;
transport.flush().await?;
match transport.recv().await? {
Some(Message::AuthOk) => {
tracing::debug!("Received acknowledgment for join announcement");
}
Some(other) => {
tracing::warn!("Expected acknowledgment, got: {:?}", other);
}
None => {
tracing::warn!("Connection closed before receiving acknowledgment");
}
}
info!(
"Join process completed successfully, sent self announcement: {}",
manager_id
);
info!("Discovered real peer: {}", peer.manager_id);
if let Err(e) = add_peer(profile, (*peer).clone()) {
tracing::warn!("Failed to add join peer to storage: {}", e);
}
Ok(*peer)
}
Some(_) => {
tracing::info!("Received unexpected message");
Err(eyre::eyre!("Did not receive expected join response"))
}
_ => Err(eyre::eyre!("Did not receive expected join response")),
}
}
pub async fn join_as_server(
ctx: &ManagerContext,
mut transport: Box<dyn Transport>,
peer: std::net::SocketAddr,
token: String,
) -> Result<(), Report> {
let csk = *ctx.security.csk.read().await;
let csk_ver = ctx.security.csk_ver.load(Ordering::SeqCst);
let whitelist = ctx.network.manager_nets.clone();
let token_dec = match decode_token(&token) {
Ok(t) => t,
Err(e) => {
tracing::error!("Join token decode failed for peer {}: {}", peer, e);
transport.send(&Message::AuthErr).await.ok();
return Ok(());
}
};
if token_dec.payload.tenant != ctx.state.self_meta.tenant
|| token_dec.payload.cluster != ctx.state.self_meta.cluster
{
tracing::warn!(
%peer,
token_tenant = %token_dec.payload.tenant,
token_cluster = %token_dec.payload.cluster,
expected_tenant = %ctx.state.self_meta.tenant,
expected_cluster = %ctx.state.self_meta.cluster,
"join token tenant/cluster mismatch"
);
transport.send(&Message::AuthErr).await.ok();
return Ok(());
}
if !whitelist.is_empty() {
let peer_addr = peer.ip();
if !whitelist.iter().any(|net| net.contains(&peer_addr)) {
tracing::warn!(%peer, "join connection from non-whitelisted address");
transport.send(&Message::AuthErr).await.ok();
return Ok(());
}
}
if let Err(e) = verify_token(&token_dec, &csk) {
tracing::error!("Join token verification failed for peer {}: {}", peer, e);
transport.send(&Message::AuthErr).await.ok();
return Err(e);
}
info!(target: "connection", %peer, "join connection established");
if let Err(e) = transport
.send(&Message::JoinResponse {
ver: csk_ver,
csk,
peer: Box::new(ctx.state.self_meta.clone()),
})
.await
{
tracing::error!("Failed to send join response to {}: {}", peer, e);
return Err(e.into());
}
tracing::debug!("Join exchange completed, CSK provided to joining peer");
if let Err(e) = transport.flush().await {
tracing::warn!("Failed to flush transport after join response: {}", e);
}
tracing::debug!("Waiting for joining manager to announce itself: {}", peer);
match transport.recv().await {
Ok(Some(Message::Announce { meta, .. })) => {
let joining_peer = *meta;
tracing::info!(
"Received peer announcement from joining manager: {} ({}:{})",
joining_peer.manager_id,
joining_peer.host,
joining_peer.quic_port
);
if let Err(e) = add_peer(&ctx.communication.profile, joining_peer.clone()) {
tracing::warn!("Failed to add joining peer to storage: {}", e);
}
update_alive(
&ctx.state.peers,
&ctx.communication.alive_tx,
&ctx.communication.profile,
joining_peer.clone(),
&ctx.state.peer_version,
)
.await;
if ctx
.communication
.dial_tx
.send(PeerDialRequest {
peer: joining_peer.clone(),
gossip_hint: false,
})
.is_err()
{
tracing::warn!(
"Failed to enqueue joining peer for dial: {}",
joining_peer.manager_id
);
} else {
tracing::debug!(
"Enqueued joining peer for mesh connection: {} ({}:{})",
joining_peer.manager_id,
joining_peer.host,
joining_peer.quic_port
);
}
if let Err(e) = transport.send(&Message::AuthOk).await {
tracing::warn!("Failed to send join announcement acknowledgment: {}", e);
} else {
tracing::debug!("Sent acknowledgment for join announcement");
}
}
Ok(Some(msg)) => {
tracing::warn!("Expected Announce from joining client, got: {:?}", msg);
}
Ok(None) => {
tracing::debug!(
"Join client closed connection before announcing itself: {}",
peer
);
}
Err(_) => {
tracing::debug!(
"Join client disconnected before announcing itself: {}",
peer
);
}
}
tracing::debug!("Waiting for join client to close connection: {}", peer);
match transport.recv().await {
Ok(None) => {
tracing::debug!("Join client closed connection cleanly: {}", peer);
}
Ok(Some(msg)) => {
tracing::warn!(
"Unexpected message from join client after announce: {:?}",
msg
);
}
Err(_) => {
tracing::debug!("Join client disconnected: {}", peer);
}
}
Ok(())
}