use super::{ManagerContext, manager::handle_manager_as_client};
use crate::{
ConnectionState,
config::ServerConfigOpts,
load_peers,
mesh::{dial_token, peer_key, should_dial_peer},
peers::{AliveTable, AliveTx},
util::{PeerLog, now_secs, short_id, sleep_backoff},
workers::WorkerTable,
};
use base64::Engine as _;
use eyre::{Report, eyre};
use quinn::crypto::rustls::QuicClientConfig;
use quinn::{ClientConfig, Endpoint};
use rustls::RootCertStore;
use rustls::pki_types::{CertificateDer, ServerName};
use sha2::{Digest, Sha256};
use socket2::{SockRef, TcpKeepalive};
use std::collections::{HashMap, HashSet};
use std::net::ToSocketAddrs;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64};
use tokio::net::TcpStream;
use tokio::sync::{Mutex, RwLock, mpsc};
use tokio::time::{Duration, timeout};
use tokio_rustls::TlsConnector;
use tracing::{debug, info, warn};
use volli_core::{ManagerPeerEntry, Protocol, Role, WorkerConfig, env_config};
use volli_transport::{QuicTransport, TcpTransport, Transport};
const MAX_BACKOFF: u64 = 60;
const DEAD_RESURRECTION_INTERVAL: u64 = 300; const DEAD_FAILURE_THRESHOLD: u32 = 5;
const DEAD_PEER_MAX_AGE: u64 = 30 * 24 * 3600;
pub(crate) struct PeerDialRequest {
pub peer: ManagerPeerEntry,
pub gossip_hint: bool,
}
pub(crate) type DialTx = mpsc::UnboundedSender<PeerDialRequest>;
type DialRx = mpsc::UnboundedReceiver<PeerDialRequest>;
#[allow(clippy::too_many_arguments)]
pub(crate) fn spawn_mesh_runner(
cfg: &ServerConfigOpts,
self_meta: ManagerPeerEntry,
peers: AliveTable,
peer_version: Arc<AtomicU64>,
alive_tx: AliveTx,
csk: Arc<RwLock<[u8; 32]>>,
csk_ver: Arc<AtomicU32>,
workers: WorkerTable,
health: super::HealthContext,
) -> DialTx {
let peers_clone = peers.clone();
let tx_clone = alive_tx.clone();
let self_meta_clone = self_meta.clone();
let profile_clone = cfg.profile.clone();
let peers = cfg.peers.clone();
let version_clone = peer_version.clone();
let workers_clone = workers.clone();
let csk_ver_clone = csk_ver.clone();
let (dial_tx, dial_rx) = mpsc::unbounded_channel();
let dial_tx_clone = dial_tx.clone();
tokio::spawn(async move {
if let Err(e) = run_manager_mesh(
peers,
self_meta_clone,
peers_clone,
tx_clone,
workers_clone,
profile_clone,
version_clone,
csk,
csk_ver_clone,
health,
dial_tx_clone,
dial_rx,
)
.await
{
tracing::error!("mesh error: {}", e);
}
});
dial_tx
}
fn configure_client(cert: &[u8], alpn: &str) -> Result<ClientConfig, Report> {
let mut roots = rustls::RootCertStore::empty();
roots.add(CertificateDer::from(cert.to_vec()))?;
let mut crypto = rustls::ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
crypto.alpn_protocols = vec![alpn.as_bytes().to_vec()];
let quic_crypto = QuicClientConfig::try_from(crypto)?;
let mut cfg = ClientConfig::new(Arc::new(quic_crypto));
let ecfg = env_config();
let hb = ecfg.heartbeat_secs();
let mut transport = quinn::TransportConfig::default();
transport.keep_alive_interval(Some(Duration::from_secs(hb)));
transport.max_idle_timeout(Some(ecfg.quic_idle_duration()?.try_into()?));
cfg.transport_config(Arc::new(transport));
Ok(cfg)
}
async fn timeout_connect<F, T, E>(dur: Duration, fut: F) -> Result<T, Report>
where
F: std::future::Future<Output = Result<T, E>>,
E: std::error::Error + Send + Sync + 'static,
{
timeout(dur, fut)
.await
.map_err(|_| eyre!("connection timed out"))?
.map_err(Report::from)
}
async fn connect_manager_tcp(cfg: &WorkerConfig) -> Result<(Box<dyn Transport>, String), Report> {
let addrs = (cfg.host.as_str(), cfg.tcp_port).to_socket_addrs()?;
let timeout = Duration::from_millis(env_config().connect_timeout_ms());
let mut last_err = None;
for addr in addrs {
match timeout_connect(timeout, TcpStream::connect(addr)).await {
Ok(stream) => {
let std_stream = stream.into_std().map_err(|e| eyre!(e))?;
let keepalive = TcpKeepalive::new()
.with_time(Duration::from_secs(env_config().heartbeat_secs()))
.with_interval(Duration::from_secs(env_config().heartbeat_secs()));
let sock = SockRef::from(&std_stream);
sock.set_keepalive(true).ok();
sock.set_tcp_keepalive(&keepalive).ok();
std_stream.set_nonblocking(true).ok();
let stream = TcpStream::from_std(std_stream).map_err(|e| eyre!(e))?;
let alpn = "volli/manager";
let mut roots = RootCertStore::empty();
roots.add(CertificateDer::from(cfg.cert.clone()))?;
let mut root = rustls::ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
root.alpn_protocols = vec![alpn.as_bytes().to_vec()];
let connector = TlsConnector::from(Arc::new(root));
match connector
.connect(ServerName::try_from("volli")?.to_owned(), stream)
.await
{
Ok(tls) => {
if let Some(certs) = tls.get_ref().1.peer_certificates()
&& let Some(cert) = certs.first()
{
let hash = Sha256::digest(cert.as_ref());
if hex::encode(hash) != cfg.fingerprint {
return Err(eyre!("server fingerprint mismatch"));
}
}
let peer = tls.get_ref().0.peer_addr()?.to_string();
return Ok((Box::new(TcpTransport::new(tls)), peer));
}
Err(e) => last_err = Some(e.into()),
}
}
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap_or_else(|| eyre!("no address succeeded")))
}
async fn connect_manager_quic(cfg: &WorkerConfig) -> Result<(Box<dyn Transport>, String), Report> {
let addrs = (cfg.host.as_str(), cfg.quic_port).to_socket_addrs()?;
let quinn_cfg = configure_client(&cfg.cert, "volli/manager")?;
let timeout = Duration::from_millis(env_config().connect_timeout_ms());
let mut last_err = None;
for addr in addrs {
let bind: std::net::SocketAddr = if addr.is_ipv6() {
"[::]:0".parse()?
} else {
"0.0.0.0:0".parse()?
};
match Endpoint::client(bind) {
Ok(mut endpoint) => {
endpoint.set_default_client_config(quinn_cfg.clone());
match endpoint.connect(addr, "volli") {
Ok(connecting) => match timeout_connect(timeout, connecting).await {
Ok(connection) => {
if let Some(identity) = connection.peer_identity()
&& let Ok(certs) = identity.downcast::<Vec<CertificateDer>>()
&& let Some(cert) = certs.first()
{
let hash = Sha256::digest(cert.as_ref());
if hex::encode(hash) != cfg.fingerprint {
return Err(eyre!("server fingerprint mismatch"));
}
}
let peer = connection.remote_address().to_string();
let (send, recv) = connection.open_bi().await?;
return Ok((Box::new(QuicTransport::new(send, recv)), peer));
}
Err(e) => last_err = Some(e),
},
Err(e) => last_err = Some(e.into()),
}
}
Err(e) => last_err = Some(e.into()),
}
}
Err(last_err.unwrap_or_else(|| eyre!("no address succeeded")))
}
#[allow(clippy::too_many_arguments)]
async fn run_manager_mesh(
initial_peers: Vec<ManagerPeerEntry>,
self_meta: ManagerPeerEntry,
alive_table: AliveTable,
alive_tx: AliveTx,
workers: WorkerTable,
profile: String,
peer_version: Arc<AtomicU64>,
csk: Arc<RwLock<[u8; 32]>>,
csk_ver: Arc<AtomicU32>,
health: super::HealthContext,
dial_tx: DialTx,
mut dial_rx: DialRx,
) -> Result<(), Report> {
let base_cfg = WorkerConfig {
role: Role::Manager,
..Default::default()
};
let proto_pref = None;
let profile_peers = crate::peers::cached_peers_for_profile(&profile)
.unwrap_or_else(|| load_peers(&profile).unwrap_or_default());
let peer_connections = Arc::new(Mutex::new(
HashMap::<String, tokio::task::JoinHandle<()>>::new(),
));
let ignored_peers = Arc::new(Mutex::new(HashSet::<String>::new()));
let pc_clone = peer_connections.clone();
let ignore_clone = ignored_peers.clone();
let alive_table_clone = alive_table.clone();
let alive_tx_clone = alive_tx.clone();
let workers_clone = workers.clone();
let profile_clone = profile.clone();
let peer_version_clone = peer_version.clone();
let health_clone = health.clone();
let proto_pref_clone = proto_pref.clone();
let csk_clone = csk.clone();
let csk_ver_clone = csk_ver.clone();
let self_meta_clone = self_meta.clone();
let dial_tx_worker = dial_tx.clone();
tokio::spawn(async move {
while let Some(req) = dial_rx.recv().await {
let key = peer_key(&req.peer);
debug!(
"Processing dial request for peer {} ({}) [gossip_hint: {}]",
req.peer.manager_name,
short_id(&req.peer.manager_id),
req.gossip_hint
);
if ignore_clone.lock().await.contains(&key) {
debug!(
"Skipping ignored peer {} ({})",
req.peer.manager_name,
short_id(&req.peer.manager_id)
);
continue;
}
if pc_clone.lock().await.contains_key(&key) {
debug!(
"Peer {} ({}) already has active connection",
req.peer.manager_name,
short_id(&req.peer.manager_id)
);
continue;
}
if !should_dial_peer(&self_meta_clone, &req.peer) {
debug!(
"Precedence rules prevent dialing peer {} ({})",
req.peer.manager_name,
short_id(&req.peer.manager_id)
);
ignore_clone.lock().await.insert(key);
continue;
}
let is_active = {
let map = alive_table_clone.lock().await;
map.get(&key)
.map(|s| s.conn_state != ConnectionState::Inactive)
.unwrap_or(false)
};
if is_active {
debug!(
"Peer {} ({}) already active; skipping dial",
req.peer.manager_name,
short_id(&req.peer.manager_id)
);
continue;
}
debug!(
"Spawning dialer for peer {} ({})",
req.peer.manager_name,
short_id(&req.peer.manager_id)
);
spawn_peer_dialer(
base_cfg.clone(),
req.peer,
self_meta_clone.clone(),
alive_table_clone.clone(),
alive_tx_clone.clone(),
workers_clone.clone(),
profile_clone.clone(),
peer_version_clone.clone(),
proto_pref_clone.clone(),
pc_clone.clone(),
csk_clone.clone(),
csk_ver_clone.clone(),
dial_tx_worker.clone(),
health_clone.clone(),
req.gossip_hint,
)
.await;
}
});
let dial_tx_clone = dial_tx.clone();
for peer in &initial_peers {
let _ = dial_tx_clone.send(PeerDialRequest {
peer: peer.clone(),
gossip_hint: false,
});
}
for peer in profile_peers {
let peer_key = peer_key(&peer);
let is_dead = {
let map = alive_table.lock().await;
map.get(&peer_key).map(|p| p.dead).unwrap_or(false)
};
if is_dead {
debug!(
peer = %PeerLog(&peer),
"sending startup resurrection ping for dead peer"
);
}
let _ = dial_tx_clone.send(PeerDialRequest {
peer,
gossip_hint: false,
});
}
let mut peer_update_rx = alive_tx.subscribe();
loop {
if peer_update_rx.recv().await.is_ok()
&& let Ok(current_peers) = load_peers(&profile)
{
for p in current_peers {
let _ = dial_tx_clone.send(PeerDialRequest {
peer: p,
gossip_hint: false,
});
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn spawn_peer_dialer(
cfg: WorkerConfig,
peer: ManagerPeerEntry,
self_meta: ManagerPeerEntry,
alive_table: AliveTable,
alive_tx: AliveTx,
workers: WorkerTable,
profile: String,
peer_version: Arc<AtomicU64>,
proto_pref: Option<Protocol>,
peer_connections: Arc<Mutex<std::collections::HashMap<String, tokio::task::JoinHandle<()>>>>,
csk: Arc<RwLock<[u8; 32]>>,
csk_ver: Arc<AtomicU32>,
dial_tx: DialTx,
health: super::HealthContext,
gossip_hint: bool,
) {
let key = peer_key(&peer);
let task_key = key.clone();
let peer_name = peer.manager_name.clone();
let peer_id = peer.manager_id.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
let handle = tokio::spawn(async move {
run_peer_dialer(
cfg,
peer,
self_meta,
alive_table,
alive_tx,
workers,
profile,
peer_version,
proto_pref,
csk,
csk_ver,
dial_tx,
health,
gossip_hint,
)
.await;
let _ = tx.send(());
});
{
let mut connections = peer_connections.lock().await;
connections.insert(task_key.clone(), handle);
}
debug!(
"Started dialer for peer {} ({})",
peer_name,
short_id(&peer_id)
);
let pc_clone = peer_connections.clone();
tokio::spawn(async move {
let _ = rx.await;
if let Some(handle) = {
let mut connections = pc_clone.lock().await;
connections.remove(&task_key)
} {
let _ = handle.await;
}
});
}
#[allow(clippy::too_many_arguments)]
async fn run_peer_dialer(
mut cfg: WorkerConfig,
peer: ManagerPeerEntry,
self_meta: ManagerPeerEntry,
alive_table: AliveTable,
alive_tx: AliveTx,
workers: WorkerTable,
profile: String,
peer_version: Arc<AtomicU64>,
proto_pref: Option<Protocol>,
csk: Arc<RwLock<[u8; 32]>>,
csk_ver: Arc<AtomicU32>,
dial_tx: DialTx,
health: super::HealthContext,
gossip_hint: bool,
) {
let key = peer_key(&peer);
let mut skip_sleep_once = gossip_hint;
let mut state = {
let map = alive_table.lock().await;
map.get(&key).cloned().unwrap_or_default()
};
let mut backoff = 1u64;
if state.dead {
if let Some(last_failure) = state.last_connect_failure {
let age = now_secs().saturating_sub(last_failure);
if age > DEAD_PEER_MAX_AGE {
debug!(
peer = %PeerLog(&peer),
age_days = age / (24 * 3600),
"peer has been dead for over 30 days, stopping resurrection attempts"
);
return; }
}
backoff = DEAD_RESURRECTION_INTERVAL;
}
if gossip_hint && state.dead {
debug!(
peer = %PeerLog(&peer),
"received gossip hint for dead peer, accelerating resurrection attempt"
);
backoff = 1; }
debug!(
"Starting mesh dialer for peer {} ({})",
peer.manager_name,
short_id(&peer.manager_id)
);
loop {
cfg.host = peer.host.clone();
cfg.tcp_port = peer.tcp_port;
cfg.quic_port = peer.quic_port;
cfg.cert = match base64::engine::general_purpose::STANDARD_NO_PAD.decode(&peer.tls_cert) {
Ok(cert_data) => cert_data,
Err(e) => {
tracing::error!("Failed to decode peer certificate: {}", e);
continue;
}
};
{
let key = *csk.read().await;
cfg.token = match dial_token(&key, &self_meta) {
Ok(tok) => tok,
Err(e) => {
tracing::error!("Failed to issue dial token: {}", e);
String::new()
}
};
}
cfg.fingerprint = peer.tls_fp.clone();
state.last_connect_attempt = Some(now_secs());
{
let mut map = alive_table.lock().await;
map.insert(key.clone(), state.clone());
}
if state.consecutive_failures == 0 || state.dead {
debug!(
"Connecting to peer {} ({}) [attempt {}]",
peer.manager_name,
short_id(&peer.manager_id),
state.consecutive_failures + 1
);
}
let connection_result = try_connect_with_fallback(&cfg, &proto_pref).await;
match connection_result {
Ok((tr, remote_peer_id)) => {
backoff = 1;
state.last_connect_success = Some(now_secs());
state.consecutive_failures = 0;
state.dead = false;
let prev_state = state.conn_state;
let before_active = if prev_state == ConnectionState::Inactive {
let map = alive_table.lock().await;
map.values()
.filter(|p| p.conn_state != ConnectionState::Inactive)
.count()
} else {
0
};
state.conn_state = ConnectionState::Client;
{
let mut map = alive_table.lock().await;
map.insert(key.clone(), state.clone());
}
if prev_state == ConnectionState::Inactive {
let after_active = {
let map = alive_table.lock().await;
map.values()
.filter(|p| p.conn_state != ConnectionState::Inactive)
.count()
};
info!(
"Manager {} gained {} peer(s): {} -> {}",
profile, 1, before_active, after_active
);
}
if state.consecutive_failures > 0 || state.dead {
let was_dead = state.dead;
let downtime = state
.last_connect_success
.map(|t| now_secs().saturating_sub(t))
.unwrap_or(0);
if was_dead {
info!(
peer = %PeerLog(&peer),
downtime_secs = downtime,
"peer reconnected after being dead"
);
} else {
info!(
peer = %PeerLog(&peer),
downtime_secs = downtime,
failures = state.consecutive_failures,
"peer reconnected"
);
}
} else {
info!(
peer = %PeerLog(&peer),
"peer connected"
);
}
let ctx = ManagerContext::new_peer(
super::SecurityContext {
signing: None,
csk: csk.clone(),
csk_ver: csk_ver.clone(),
},
super::StateContext {
peers: alive_table.clone(),
workers: workers.clone(),
self_meta: self_meta.clone(),
peer_version: peer_version.clone(),
command_distributor: None,
},
super::CommunicationContext {
alive_tx: alive_tx.clone(),
dial_tx: dial_tx.clone(),
profile: profile.clone(),
},
cfg.clone(),
health.clone(),
);
let result = handle_manager_as_client(ctx, tr, remote_peer_id).await;
if let Err(e) = result {
if e.to_string().contains("connection lost") {
debug!(
"Connection lost for peer {} ({}): {}",
peer.manager_name,
short_id(&peer.manager_id),
e
);
} else {
tracing::error!(
"Manager peer handler failed for {} ({}): {} - connection details: host={}:{}",
peer.manager_name,
peer.manager_id,
e,
peer.host,
peer.quic_port
);
}
}
state.conn_state = ConnectionState::Inactive;
{
let mut map = alive_table.lock().await;
map.insert(key.clone(), state.clone());
}
let now = now_secs();
let should_log_disconnect = state
.last_disconnect_log
.map(|last_log| now.saturating_sub(last_log) >= 30) .unwrap_or(true);
if should_log_disconnect {
info!(
peer = %PeerLog(&peer),
"peer disconnected, will retry"
);
state.last_disconnect_log = Some(now);
let mut map = alive_table.lock().await;
map.insert(key.clone(), state.clone());
}
}
Err(e) => {
let was_dead = state.dead;
state.last_connect_failure = Some(now_secs());
state.consecutive_failures += 1;
if state.consecutive_failures >= DEAD_FAILURE_THRESHOLD || state.dead {
if !was_dead {
warn!(
peer = %PeerLog(&peer),
failures = state.consecutive_failures,
"peer marked as dead after consecutive failures"
);
}
state.dead = true;
backoff = DEAD_RESURRECTION_INTERVAL;
} else {
if state.consecutive_failures == 1 {
debug!(
peer = %PeerLog(&peer),
"initial connection failure: {}",
e
);
}
backoff = (backoff * 2).min(MAX_BACKOFF);
}
{
let mut map = alive_table.lock().await;
map.insert(key.clone(), state.clone());
}
}
}
if state.dead && backoff == DEAD_RESURRECTION_INTERVAL {
debug!(
peer = %PeerLog(&peer),
"retrying dead peer (5 minute interval)"
);
}
if skip_sleep_once {
skip_sleep_once = false;
} else {
sleep_backoff(backoff).await;
}
}
}
pub async fn try_connect_with_fallback(
cfg: &WorkerConfig,
proto_pref: &Option<Protocol>,
) -> Result<(Box<dyn Transport>, String), Report> {
let preferred = proto_pref.as_ref().unwrap_or(&Protocol::Quic);
let first_result = match preferred {
Protocol::Quic => connect_manager_quic(cfg).await,
Protocol::Tcp => connect_manager_tcp(cfg).await,
};
match first_result {
Ok(result) => {
debug!("Connected to {} over {:?}", cfg.host, preferred);
return Ok(result);
}
Err(_e) => {
debug!(
"Preferred protocol {:?} failed for {}, trying fallback",
preferred, cfg.host
);
}
}
let fallback_result = match preferred {
Protocol::Quic => connect_manager_tcp(cfg).await,
Protocol::Tcp => connect_manager_quic(cfg).await,
};
match fallback_result {
Ok(result) => {
let fallback_proto = match preferred {
Protocol::Quic => Protocol::Tcp,
Protocol::Tcp => Protocol::Quic,
};
debug!(
"Connected to {} over {:?} (fallback)",
cfg.host, fallback_proto
);
Ok(result)
}
Err(e) => {
debug!("Both protocols failed for {}: {}", cfg.host, e);
Err(e)
}
}
}