use bytes::Bytes;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::net::{Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use dashmap::{DashMap, DashSet};
use anyhow::{Context, Result};
use iroh::address_lookup::PkarrRelayClient;
use iroh::endpoint::{Connection, Endpoint, VarInt};
use iroh::protocol::{AcceptError, ProtocolHandler};
use iroh::{EndpointId, SecretKey};
use iroh_blobs::store::fs::FsStore;
use iroh_blobs::{BlobsProtocol, HashAndFormat};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::audit;
use crate::config;
use crate::control::{self, ControlMsg};
use crate::dht;
use crate::dns;
use crate::dns_config;
use crate::firewall::{self, SharedFirewall};
use crate::forward;
use crate::identity;
use crate::ipc::{self, FirewallRuleView, IpcMessage, NetworkRole, NetworkStatus, PeerStatus};
use crate::membership::{
ApprovedEntry, ApprovedList, GroupMode, IdentityProvider, IrohIdentityProvider, Member,
MemberList, canonical_group_bytes, derive_ipv6, group_blob_hash, verify_group_blob,
};
use crate::network_name;
use crate::peers::{self, PeerTable};
use crate::stats::ForwardMetrics;
use crate::transport;
use crate::tun::{self, check_cgnat_conflict};
use ray_proto::SuggestedFirewall;
mod handlers;
const BACKOFF_INITIAL: Duration = Duration::from_secs(1);
const BACKOFF_MAX: Duration = Duration::from_secs(30);
const PAIR_ALPN: &[u8] = b"rayfish/pair/1";
#[derive(Clone)]
pub(crate) struct MeshCtx {
identity: IrohIdentityProvider,
peers: PeerTable,
tun_tx: mpsc::Sender<Bytes>,
stats: Arc<ForwardMetrics>,
blob_store: FsStore,
firewall: SharedFirewall,
hostname_table: dns::HostnameTable,
reverse_table: dns::ReverseLookupTable,
device_user_map: peers::DeviceUserMap,
}
impl MeshCtx {
fn forward_ctx(
&self,
disconnect_tx: mpsc::Sender<forward::DisconnectEvent>,
token: CancellationToken,
) -> forward::ForwardCtx {
forward::ForwardCtx {
firewall: self.firewall.clone(),
tun_tx: self.tun_tx.clone(),
disconnect_tx,
token,
stats: self.stats.clone(),
device_user_map: self.device_user_map.clone(),
}
}
}
pub(crate) fn to_member_entries<'a>(
members: impl IntoIterator<Item = &'a Member>,
) -> Vec<config::MemberEntry> {
members
.into_iter()
.map(|m| config::MemberEntry {
identity: m.identity,
ip: m.ip,
is_coordinator: m.is_coordinator,
hostname: m.hostname.clone(),
})
.collect()
}
pub(crate) fn to_approved_entries<'a>(
approved: impl IntoIterator<Item = &'a ApprovedEntry>,
) -> Vec<config::ApprovedConfigEntry> {
approved
.into_iter()
.map(|a| config::ApprovedConfigEntry {
identity: a.identity,
ip: a.ip,
hostname: a.hostname.clone(),
})
.collect()
}
struct CoordinatorAcceptState {
ctx: MeshCtx,
network_name: String,
state: SharedNetworkState,
disconnect_tx: mpsc::Sender<forward::DisconnectEvent>,
token: CancellationToken,
dht_notify: Option<Arc<tokio::sync::Notify>>,
invite_lock: Arc<tokio::sync::Mutex<()>>,
pending_pongs: Arc<DashMap<u64, tokio::sync::oneshot::Sender<()>>>,
}
impl CoordinatorAcceptState {
fn handle_known_member_reconnect(
&self,
conn: Connection,
remote_id: EndpointId,
peer_ip: Ipv4Addr,
) {
tracing::info!(ip = %peer_ip, "known member reconnecting");
crate::spawn_path_logger(conn.clone(), remote_id.fmt_short().to_string());
let peer_ipv6 = derive_ipv6(&remote_id);
self.ctx
.peers
.add(peer_ip, peer_ipv6, conn.clone(), remote_id, &self.network_name);
let token = self.token.clone();
let disconnect_tx = self.disconnect_tx.clone();
let network = self.network_name.clone();
let state = self.state.clone();
let dht_notify = self.dht_notify.clone();
let invite_lock = self.invite_lock.clone();
let pending_pongs = self.pending_pongs.clone();
let ctx = self.ctx.clone();
tokio::spawn(async move {
send_member_sync(&conn).await;
spawn_coordinator_control_reader(
conn.clone(),
remote_id,
peer_ip,
network.clone(),
state,
ctx.clone(),
dht_notify,
token.clone(),
invite_lock,
pending_pongs,
);
forward::spawn_peer_reader(
conn,
remote_id,
peer_ip,
peer_ipv6,
network,
ctx.forward_ctx(disconnect_tx, token),
);
});
}
async fn handle_connection(&self, conn: Connection) {
let remote_id = conn.remote_id();
let member_ip = {
let s = self.state.read().unwrap();
s.members.get(&remote_id).map(|m| m.ip)
};
let peer_ip = member_ip.unwrap_or_else(|| self.ctx.identity.derive_ip(&remote_id));
if member_ip.is_some() {
self.handle_known_member_reconnect(conn, remote_id, peer_ip);
return;
}
let (send, mut recv) =
match tokio::time::timeout(Duration::from_secs(5), conn.accept_bi()).await {
Ok(Ok(pair)) => pair,
_ => return,
};
let msg = match tokio::time::timeout(Duration::from_secs(5), control::recv_msg(&mut recv))
.await
{
Ok(Ok(m)) => m,
_ => return,
};
let (invite_secret, hostname, device_cert) = match msg {
ControlMsg::JoinRequest {
invite_secret,
hostname,
device_cert,
} => (invite_secret, hostname, device_cert),
ControlMsg::MeshHello {
hostname,
device_cert,
..
} => (None, hostname, device_cert),
_ => return,
};
if let Some(ref cert) = device_cert {
if !cert.verify() || cert.device_key != remote_id {
tracing::warn!(peer = %remote_id.fmt_short(), "invalid device certificate");
return;
}
self.ctx.device_user_map.insert(remote_id, cert.user_identity);
}
let is_approved = self.state.read().unwrap().approved.is_approved(&remote_id);
if is_approved {
self.admit_peer(
conn,
send,
remote_id,
peer_ip,
hostname,
device_cert,
true,
false,
)
.await;
return;
}
if let Some(secret) = invite_secret {
self.redeem_invite_and_admit(
conn, send, remote_id, peer_ip, hostname, device_cert, secret,
)
.await;
return;
}
let mode = self.state.read().unwrap().mode;
match mode {
GroupMode::Open => {
self.admit_peer(
conn,
send,
remote_id,
peer_ip,
hostname,
device_cert,
false,
false,
)
.await;
}
GroupMode::Restricted => {
{
let mut s = self.state.write().unwrap();
s.pending.insert(
remote_id,
PendingJoin {
hostname,
device_cert,
requested_at: Instant::now(),
},
);
}
tracing::info!(peer = %remote_id.fmt_short(), ip = %peer_ip, "join queued for approval");
let mut send = send;
let _ = control::send_msg(&mut send, &ControlMsg::JoinPending).await;
let _ = tokio::time::timeout(Duration::from_secs(5), conn.closed()).await;
}
}
}
#[allow(clippy::too_many_arguments)]
async fn redeem_invite_and_admit(
&self,
conn: Connection,
send: iroh::endpoint::SendStream,
remote_id: EndpointId,
peer_ip: Ipv4Addr,
hostname: Option<String>,
device_cert: Option<control::DeviceCert>,
secret: Vec<u8>,
) {
let redeemed = {
let _guard = self.invite_lock.lock().await;
match crate::invite::InviteStore::load(&self.network_name) {
Ok(mut store) => store.redeem(&secret, remote_id),
Err(e) => Err(e),
}
};
match redeemed {
Ok(invite_hostname) => {
tracing::info!(peer = %remote_id.fmt_short(), "invite redeemed");
let authoritative = invite_hostname.is_some();
let assigned = invite_hostname.or(hostname);
let admitted = self
.admit_peer(
conn,
send,
remote_id,
peer_ip,
assigned,
device_cert,
false,
authoritative,
)
.await;
if !admitted {
let _guard = self.invite_lock.lock().await;
if let Ok(mut store) = crate::invite::InviteStore::load(&self.network_name) {
let _ = store.restore(&secret);
}
} else {
let secret_hash = crate::invite::hash_secret(&secret);
let members = self.state.read().unwrap().roster();
gossip_to_coordinators(
&self.ctx.peers,
&self.network_name,
&members,
self.ctx.identity.local_identity(),
&ControlMsg::InviteUsed {
secret_hash: secret_hash.into_bytes(),
},
)
.await;
}
}
Err(single_use_err) => {
let reusable_id = {
let s = self.state.read().unwrap();
crate::membership::validate_reusable_key(&s.reusable_keys, &secret, now_secs())
.map(|k| k.id.clone())
};
if let Some(key_id) = reusable_id {
tracing::info!(
peer = %remote_id.fmt_short(),
key_id = %key_id,
"reusable key redeemed"
);
self.admit_peer(
conn, send, remote_id, peer_ip, hostname, device_cert, false, false,
)
.await;
} else {
tracing::warn!(peer = %remote_id.fmt_short(), error = %single_use_err, "invite rejected");
self.deny(&conn, send, format!("invite rejected: {single_use_err}"))
.await;
}
}
}
}
async fn deny(&self, conn: &Connection, mut send: iroh::endpoint::SendStream, reason: String) {
let _ = control::send_msg(&mut send, &ControlMsg::JoinDenied { reason }).await;
let _ = tokio::time::timeout(Duration::from_secs(5), conn.closed()).await;
}
#[allow(clippy::too_many_arguments)]
async fn admit_peer(
&self,
conn: Connection,
mut send: iroh::endpoint::SendStream,
remote_id: EndpointId,
_suggested_ip: Ipv4Addr,
hostname: Option<String>,
device_cert: Option<control::DeviceCert>,
was_approved: bool,
authoritative: bool,
) -> bool {
let (peer_ip, collision_index) = {
let s = self.state.read().unwrap();
crate::membership::assign_ip(&s.members, &remote_id)
};
let final_hostname = if let Some(desired) = hostname {
let taken = {
let s = self.state.read().unwrap();
s.members
.all()
.iter()
.filter(|m| m.identity != remote_id)
.filter_map(|m| m.hostname.clone())
.collect::<Vec<String>>()
};
let taken_refs: Vec<&str> = taken.iter().map(|s| s.as_str()).collect();
match crate::hostname::admission_hostname(&desired, &taken_refs, authoritative) {
Ok(name) => Some(name),
Err(conflict) => {
self.deny(
&conn,
send,
format!("hostname '{conflict}' is already in use on this network"),
)
.await;
return false;
}
}
} else {
None
};
let collision = {
let s = self.state.read().unwrap();
if let Some(existing) = s.members.get_by_ip(peer_ip) {
existing.identity != remote_id
} else if let Some(existing) = s.approved.get_by_ip(peer_ip) {
existing.identity != remote_id
} else {
false
}
};
if collision {
self.deny(
&conn,
send,
format!("IP collision: {peer_ip} already assigned"),
)
.await;
return false;
}
let user_id_opt = device_cert.as_ref().map(|c| c.user_identity);
let snap_bytes = {
let mut s = self.state.write().unwrap();
if was_approved {
s.approved.remove(&remote_id);
}
s.pending.remove(&remote_id);
let _ = s.members.add(Member {
identity: remote_id,
ip: peer_ip,
is_coordinator: false,
hostname: final_hostname.clone(),
user_identity: user_id_opt,
device_cert: device_cert.clone(),
collision_index,
});
s.refresh_snapshot();
s.snapshot.as_ref().map(|snap| snap.msgpack_bytes.clone())
};
if let Some(bytes) = snap_bytes {
let _ = self.ctx.blob_store.blobs().add_slice(&bytes).await;
}
if let Some(ref h) = final_hostname {
dns::update_hostname(
&self.ctx.hostname_table,
&self.ctx.reverse_table,
&self.network_name,
h,
peer_ip,
derive_ipv6(&remote_id),
)
.await;
}
broadcast_control_msg(
&self.ctx.peers,
&ControlMsg::MemberApproved {
identity: remote_id,
ip: peer_ip,
hostname: final_hostname.clone(),
device_cert: device_cert.clone(),
},
)
.await;
let (members, approved) = {
let s = self.state.read().unwrap();
(s.roster(), s.approved_snapshot())
};
tracing::info!(ip = %peer_ip, "new member admitted and joined");
let _ = control::send_msg(
&mut send,
&ControlMsg::Welcome {
members: members.clone(),
approved,
},
)
.await;
if let Some(notify) = &self.dht_notify {
notify.notify_one();
}
broadcast_member_sync(&self.ctx.peers, Some(peer_ip)).await;
let peer_ipv6 = derive_ipv6(&remote_id);
crate::spawn_path_logger(conn.clone(), remote_id.fmt_short().to_string());
self.ctx.peers.add(
peer_ip,
peer_ipv6,
conn.clone(),
remote_id,
&self.network_name,
);
spawn_coordinator_control_reader(
conn.clone(),
remote_id,
peer_ip,
self.network_name.clone(),
self.state.clone(),
self.ctx.clone(),
self.dht_notify.clone(),
self.token.clone(),
self.invite_lock.clone(),
self.pending_pongs.clone(),
);
forward::spawn_peer_reader(
conn,
remote_id,
peer_ip,
peer_ipv6,
self.network_name.clone(),
self.ctx
.forward_ctx(self.disconnect_tx.clone(), self.token.clone()),
);
true
}
}
struct MemberAcceptState {
ctx: MeshCtx,
network_name: String,
state: SharedNetworkState,
disconnect_tx: mpsc::Sender<forward::DisconnectEvent>,
token: CancellationToken,
}
impl MemberAcceptState {
fn register_peer(&self, conn: Connection, peer_identity: EndpointId, ip: Ipv4Addr) {
let peer_ipv6 = derive_ipv6(&peer_identity);
self.ctx
.peers
.add(ip, peer_ipv6, conn.clone(), peer_identity, &self.network_name);
forward::spawn_peer_reader(
conn,
peer_identity,
ip,
peer_ipv6,
self.network_name.clone(),
self.ctx
.forward_ctx(self.disconnect_tx.clone(), self.token.clone()),
);
}
async fn handle_connection(&self, conn: Connection) {
let Ok((_send, mut recv)) = conn.accept_bi().await else {
return;
};
let transport_id = conn.remote_id();
let Ok(ControlMsg::MeshHello {
identity: peer_identity,
ip,
hostname,
device_cert,
..
}) = control::recv_msg(&mut recv).await
else {
return;
};
let effective_user_id = if peer_identity == transport_id {
peer_identity
} else if let Some(ref cert) = device_cert {
if !cert.verify()
|| cert.device_key != transport_id
|| cert.user_identity != peer_identity
{
tracing::warn!(peer = %transport_id.fmt_short(), "invalid device certificate");
return;
}
cert.user_identity
} else {
return;
};
if let Some(ref cert) = device_cert {
self.ctx.device_user_map
.insert(transport_id, cert.user_identity);
}
let _ = effective_user_id;
let (is_member, is_approved) = {
let s = self.state.read().unwrap();
(
s.members.is_member(&peer_identity),
s.approved.is_approved(&peer_identity),
)
};
let final_hostname = if let Some(desired) = hostname {
let taken = self.state.read().unwrap().taken_hostnames(peer_identity);
let taken_refs: Vec<&str> = taken.iter().map(|s| s.as_str()).collect();
Some(crate::hostname::resolve_collision(&desired, &taken_refs))
} else {
None
};
if let Some(ref h) = final_hostname {
let ipv6 = derive_ipv6(&peer_identity);
dns::update_hostname(
&self.ctx.hostname_table,
&self.ctx.reverse_table,
&self.network_name,
h,
ip,
ipv6,
)
.await;
}
if is_approved {
self.admit_approved_member(conn, peer_identity, ip, final_hostname, device_cert)
.await;
} else if is_member {
if final_hostname.is_some() {
let mut s = self.state.write().unwrap();
if let Some(m) = s.members.get_mut(&peer_identity) {
m.hostname = final_hostname;
}
}
self.register_peer(conn, peer_identity, ip);
}
}
async fn admit_approved_member(
&self,
conn: Connection,
peer_identity: EndpointId,
ip: Ipv4Addr,
final_hostname: Option<String>,
device_cert: Option<control::DeviceCert>,
) {
let (snap_bytes, ip) = {
let mut s = self.state.write().unwrap();
let approved_entry = s.approved.remove(&peer_identity);
let user_id_opt = device_cert.as_ref().map(|c| c.user_identity);
let (member_ip, member_idx) = approved_entry
.as_ref()
.map(|e| (e.ip, e.collision_index))
.unwrap_or((ip, 0));
let _ = s.members.add(Member {
identity: peer_identity,
ip: member_ip,
is_coordinator: false,
hostname: final_hostname.clone(),
user_identity: user_id_opt,
device_cert: device_cert.clone(),
collision_index: member_idx,
});
s.refresh_snapshot();
(
s.snapshot.as_ref().map(|snap| snap.msgpack_bytes.clone()),
member_ip,
)
};
if let Some(bytes) = snap_bytes {
let _ = self.ctx.blob_store.blobs().add_slice(&bytes).await;
}
let (members, approved_list) = {
let s = self.state.read().unwrap();
(s.roster(), s.approved_snapshot())
};
if let Ok((mut send, _)) = conn.open_bi().await {
let _ = control::send_msg(
&mut send,
&ControlMsg::Welcome {
members,
approved: approved_list,
},
)
.await;
}
self.register_peer(conn, peer_identity, ip);
broadcast_member_sync(&self.ctx.peers, Some(ip)).await;
}
}
enum AcceptHandler {
Coordinator(Arc<CoordinatorAcceptState>),
Member(Arc<MemberAcceptState>),
}
#[cfg(test)]
impl AcceptHandler {
fn is_coordinator(&self) -> bool {
matches!(self, AcceptHandler::Coordinator(_))
}
}
struct MeshProtocol {
handler: AcceptHandler,
}
impl std::fmt::Debug for MeshProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MeshProtocol").finish()
}
}
impl ProtocolHandler for MeshProtocol {
async fn accept(&self, conn: Connection) -> Result<(), AcceptError> {
match &self.handler {
AcceptHandler::Coordinator(state) => state.handle_connection(conn).await,
AcceptHandler::Member(state) => state.handle_connection(conn).await,
}
Ok(())
}
}
struct PendingFile {
id: u64,
from: EndpointId,
filename: String,
size: u64,
mime_type: String,
blob_hash: blake3::Hash,
}
#[derive(Clone)]
struct PendingConnect {
from_contact_id: EndpointId,
from_endpoint: EndpointId,
hostname: Option<String>,
requested_at: Instant,
}
struct ProtocolRouter {
blobs: BlobsProtocol,
handlers: DashMap<Vec<u8>, Arc<MeshProtocol>>,
pending_files: Arc<std::sync::Mutex<Vec<PendingFile>>>,
file_id_counter: Arc<AtomicU64>,
pairing_secret: Arc<std::sync::Mutex<Option<[u8; 32]>>>,
secret_key: SecretKey,
pending_connects: Arc<DashMap<EndpointId, PendingConnect>>,
approved_connects: Arc<DashMap<EndpointId, (EndpointId, EndpointId)>>,
outgoing_connects: Arc<DashSet<EndpointId>>,
pending_pongs: Arc<DashMap<u64, tokio::sync::oneshot::Sender<()>>>,
}
impl ProtocolRouter {
fn new(
blobs: BlobsProtocol,
secret_key: SecretKey,
pairing_secret: Arc<std::sync::Mutex<Option<[u8; 32]>>>,
) -> Self {
Self {
blobs,
handlers: DashMap::new(),
pending_files: Arc::new(std::sync::Mutex::new(Vec::new())),
file_id_counter: Arc::new(AtomicU64::new(1)),
pairing_secret,
secret_key,
pending_connects: Arc::new(DashMap::new()),
approved_connects: Arc::new(DashMap::new()),
outgoing_connects: Arc::new(DashSet::new()),
pending_pongs: Arc::new(DashMap::new()),
}
}
fn register(&self, alpn: Vec<u8>, handler: AcceptHandler) {
self.handlers
.insert(alpn, Arc::new(MeshProtocol { handler }));
}
fn unregister(&self, alpn: &[u8]) {
self.handlers.remove(alpn);
}
fn alpns(&self) -> Vec<Vec<u8>> {
let mut alpns: Vec<Vec<u8>> = self.handlers.iter().map(|r| r.key().clone()).collect();
alpns.push(iroh_blobs::protocol::ALPN.to_vec());
alpns.push(transport::FILES_ALPN.to_vec());
alpns.push(PAIR_ALPN.to_vec());
alpns.push(transport::CONNECT_ALPN.to_vec());
alpns
}
async fn accept_file_offer(&self, conn: Connection) {
let pending = self.pending_files.clone();
let counter = self.file_id_counter.clone();
let remote_id = conn.remote_id();
match conn.accept_bi().await {
Ok((_send, mut recv)) => {
match control::recv_msg(&mut recv).await {
Ok(control::ControlMsg::FileOffer { from, filename, size, mime_type, blob_hash }) => {
if from == remote_id {
let id = counter.fetch_add(1, Ordering::Relaxed);
tracing::info!(from = %from.fmt_short(), filename = %filename, size, "file offer received");
pending.lock().unwrap().push(PendingFile { id, from, filename, size, mime_type, blob_hash });
} else {
tracing::warn!(claimed = %from.fmt_short(), actual = %remote_id.fmt_short(), "file offer identity mismatch");
}
}
Ok(other) => {
tracing::warn!(msg = ?other, "unexpected control message on FILES_ALPN");
}
Err(e) => {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to read file offer");
}
}
}
Err(e) => {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to accept bi stream for file offer");
}
}
}
async fn accept_pair_request(&self, conn: Connection) {
let pairing_secret = self.pairing_secret.clone();
let secret_key = self.secret_key.clone();
let remote_id = conn.remote_id();
match conn.accept_bi().await {
Ok((mut send, mut recv)) => {
let mut len_buf = [0u8; 4];
if let Err(e) = recv.read_exact(&mut len_buf).await {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to read pair request length");
return;
}
let body_len = u32::from_be_bytes(len_buf) as usize;
let mut body = vec![0u8; body_len];
if let Err(e) = recv.read_exact(&mut body).await {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to read pair request body");
return;
}
let request: control::PairMsg = match rmp_serde::from_slice(&body) {
Ok(r) => r,
Err(e) => {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to decode pair request");
return;
}
};
match request {
control::PairMsg::Request { secret, device_pubkey } => {
let stored = pairing_secret.lock().unwrap().take();
match stored {
Some(expected) if expected == secret => {
let cert = control::DeviceCert::create(&secret_key, &device_pubkey);
let response = control::PairMsg::Response { cert };
let response_bytes = match rmp_serde::to_vec_named(&response) {
Ok(b) => b,
Err(e) => {
tracing::warn!(error = %e, "failed to encode pair response");
return;
}
};
let len = (response_bytes.len() as u32).to_be_bytes();
if let Err(e) = send.write_all(&len).await {
tracing::warn!(error = %e, "failed to send pair response length");
return;
}
if let Err(e) = send.write_all(&response_bytes).await {
tracing::warn!(error = %e, "failed to send pair response body");
return;
}
let _ = send.finish();
let _ = tokio::time::timeout(
Duration::from_secs(5),
conn.closed(),
)
.await;
tracing::info!(device = %device_pubkey.fmt_short(), "device paired successfully");
}
Some(_) => {
tracing::warn!(peer = %remote_id.fmt_short(), "pairing secret mismatch");
}
None => {
tracing::warn!(peer = %remote_id.fmt_short(), "no pairing session active");
}
}
}
_ => {
tracing::warn!(peer = %remote_id.fmt_short(), "unexpected pair message type");
}
}
}
Err(e) => {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to accept bi stream for pairing");
}
}
}
async fn accept_connect_request(&self, conn: Connection) {
let pending = self.pending_connects.clone();
let approved = self.approved_connects.clone();
let remote_id = conn.remote_id();
match conn.accept_bi().await {
Ok((mut send, mut recv)) => {
let request: control::ConnectMsg = match control::recv_framed(&mut recv).await {
Ok(r) => r,
Err(e) => {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to read connect request");
return;
}
};
if let control::ConnectMsg::Request { from_contact_id, from_endpoint, hostname } = request {
if from_endpoint != remote_id {
tracing::warn!(claimed = %from_endpoint.fmt_short(), actual = %remote_id.fmt_short(), "connect request endpoint mismatch");
let _ = control::send_framed(&mut send, &control::ConnectMsg::Denied { reason: "endpoint mismatch".to_string() }).await;
return;
}
let already = approved.get(&from_endpoint).map(|r| *r.value());
let reply = if let Some((room_id, coordinator)) = already {
control::ConnectMsg::Approved { room_id, coordinator }
} else {
pending.insert(from_endpoint, PendingConnect {
from_contact_id,
from_endpoint,
hostname,
requested_at: Instant::now(),
});
tracing::info!(from = %from_contact_id.fmt_short(), endpoint = %from_endpoint.fmt_short(), "connect request received");
control::ConnectMsg::Pending
};
if let Err(e) = control::send_framed(&mut send, &reply).await {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to send connect reply");
return;
}
let _ = tokio::time::timeout(Duration::from_secs(5), conn.closed()).await;
} else {
tracing::warn!(peer = %remote_id.fmt_short(), "unexpected connect message type");
}
}
Err(e) => {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to accept bi stream for connect");
}
}
}
fn spawn_accept_loop(
self: &Arc<Self>,
endpoint: Endpoint,
cancel: CancellationToken,
) -> tokio::task::JoinHandle<()> {
let router = self.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = cancel.cancelled() => return,
incoming = endpoint.accept() => {
let Some(incoming) = incoming else { return };
let router = router.clone();
tokio::spawn(async move {
let conn = match incoming.await {
Ok(c) => c,
Err(e) => {
tracing::debug!(error = ?e, "incoming handshake failed");
return;
}
};
let alpn = conn.alpn().to_vec();
match alpn.as_slice() {
a if a == iroh_blobs::protocol::ALPN => {
let _ = router.blobs.clone().accept(conn).await;
}
a if a == transport::FILES_ALPN => router.accept_file_offer(conn).await,
a if a == PAIR_ALPN => router.accept_pair_request(conn).await,
a if a == transport::CONNECT_ALPN => router.accept_connect_request(conn).await,
_ => {
if let Some(handler) = router.handlers.get(&alpn).map(|r| r.clone()) {
let _ = handler.accept(conn).await;
} else {
tracing::warn!(
alpn = %String::from_utf8_lossy(&alpn),
"no handler for ALPN"
);
}
}
}
});
}
}
}
})
}
}
#[derive(Clone)]
struct GroupSnapshot {
hash: blake3::Hash,
msgpack_bytes: Vec<u8>,
}
pub(crate) type SharedNetworkState = Arc<std::sync::RwLock<NetworkState>>;
pub(crate) struct NetworkState {
members: MemberList,
approved: ApprovedList,
snapshot: Option<GroupSnapshot>,
network_secret_key: Option<SecretKey>,
network_public_key: EndpointId,
network_name: Option<String>,
mode: GroupMode,
suggested_firewall: SuggestedFirewall,
reusable_keys: BTreeMap<String, crate::membership::ReusableKey>,
pending_suggestions: Vec<firewall::FirewallRule>,
pending: HashMap<EndpointId, PendingJoin>,
}
struct PendingJoin {
hostname: Option<String>,
device_cert: Option<control::DeviceCert>,
requested_at: Instant,
}
impl NetworkState {
fn roster(&self) -> Vec<Member> {
self.members.all().into_iter().cloned().collect()
}
fn approved_snapshot(&self) -> Vec<ApprovedEntry> {
self.approved.all().into_iter().cloned().collect()
}
fn taken_hostnames(&self, except: EndpointId) -> Vec<String> {
self.members
.all()
.iter()
.filter(|m| m.identity != except)
.filter_map(|m| m.hostname.clone())
.collect()
}
fn refresh_snapshot(&mut self) {
let bytes = canonical_group_bytes(
&self.members,
&self.approved,
&self.suggested_firewall,
self.network_name.as_deref(),
&self.reusable_keys,
);
let hash = blake3::hash(&bytes);
self.snapshot = Some(GroupSnapshot {
hash,
msgpack_bytes: bytes,
});
}
}
#[allow(dead_code)]
pub struct NetworkHandle {
name: String,
network_key: EndpointId,
role: NetworkRole,
my_ip: Ipv4Addr,
state: SharedNetworkState,
dht_notify: Option<Arc<tokio::sync::Notify>>,
cancel: CancellationToken,
tasks: Vec<JoinHandle<()>>,
invite_lock: Arc<tokio::sync::Mutex<()>>,
disconnect_tx: mpsc::Sender<forward::DisconnectEvent>,
}
pub struct DaemonState {
endpoint: Endpoint,
identity: IrohIdentityProvider,
peers: PeerTable,
stats: Arc<ForwardMetrics>,
start: Instant,
tun_tx: mpsc::Sender<Bytes>,
networks: Arc<DashMap<String, NetworkHandle>>,
shutdown_token: CancellationToken,
blob_store: FsStore,
firewall: SharedFirewall,
protocol_router: Arc<ProtocolRouter>,
hostname_table: dns::HostnameTable,
reverse_table: dns::ReverseLookupTable,
mdns_enabled: bool,
tun_name: String,
pairing_secret: Arc<std::sync::Mutex<Option<[u8; 32]>>>,
device_cert: Option<control::DeviceCert>,
device_user_map: peers::DeviceUserMap,
contact_public: EndpointId,
active: Arc<AtomicBool>,
dns_configurator: Arc<std::sync::Mutex<Option<Box<dyn dns_config::DnsConfigurator>>>>,
resolver: std::sync::Arc<crate::dns_resolver::Resolver>,
dns_reassert_token: std::sync::Mutex<Option<tokio_util::sync::CancellationToken>>,
ssh_authz: crate::ssh::SshAuthz,
ssh_token: std::sync::Mutex<Option<tokio_util::sync::CancellationToken>>,
promote_tx: mpsc::Sender<String>,
}
fn role_for_key_holder(holds_network_key: bool) -> NetworkRole {
if holds_network_key {
NetworkRole::Coordinator
} else {
NetworkRole::Member
}
}
fn admin_grant_key_valid(secret_key: [u8; 32], net_pubkey: EndpointId) -> bool {
SecretKey::from(secret_key).public() == net_pubkey
}
fn should_promote(current: NetworkRole) -> bool {
!current.is_coordinator()
}
impl DaemonState {
pub(crate) fn mesh_ctx(&self) -> MeshCtx {
MeshCtx {
identity: self.identity.clone(),
peers: self.peers.clone(),
tun_tx: self.tun_tx.clone(),
stats: self.stats.clone(),
blob_store: self.blob_store.clone(),
firewall: self.firewall.clone(),
hostname_table: self.hostname_table.clone(),
reverse_table: self.reverse_table.clone(),
device_user_map: self.device_user_map.clone(),
}
}
pub(crate) async fn refresh_alpns(&self) {
let alpns = self.protocol_router.alpns();
let alpn_strs: Vec<String> = alpns
.iter()
.map(|a| String::from_utf8_lossy(a).to_string())
.collect();
tracing::info!(alpns = ?alpn_strs, "refreshing ALPNs");
self.endpoint.set_alpns(alpns);
let network_names: Vec<String> = self.networks.iter().map(|e| e.key().clone()).collect();
dns_config::update_search_domains(&network_names, &self.tun_name).await;
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn register_coordinator_handler(
&self,
network: &str,
state: SharedNetworkState,
invite_lock: Arc<tokio::sync::Mutex<()>>,
dht_notify: Option<Arc<tokio::sync::Notify>>,
network_key: EndpointId,
disconnect_tx: mpsc::Sender<forward::DisconnectEvent>,
cancel: CancellationToken,
) {
self.protocol_router.register(
transport::network_alpn(&network_key),
AcceptHandler::Coordinator(Arc::new(CoordinatorAcceptState {
ctx: self.mesh_ctx(),
network_name: network.to_string(),
state,
disconnect_tx,
token: cancel,
dht_notify,
invite_lock,
pending_pongs: self.protocol_router.pending_pongs.clone(),
})),
);
if let Some(mut handle) = self.networks.get_mut(network) {
handle.role = NetworkRole::Coordinator;
}
}
pub(crate) async fn promote_to_coordinator(&self, network: &str) {
let parts = {
let Some(h) = self.networks.get(network) else {
return;
};
if !should_promote(h.role.clone()) {
return;
}
(
h.state.clone(),
h.invite_lock.clone(),
h.dht_notify.clone(),
h.network_key,
h.disconnect_tx.clone(),
h.cancel.clone(),
)
}; self.register_coordinator_handler(
network, parts.0, parts.1, parts.2, parts.3, parts.4, parts.5,
);
self.refresh_alpns().await;
tracing::info!(network, "promoted to coordinator accept handler");
}
pub(crate) fn check_authorized(req: &IpcMessage, peer_cred: Option<(u32, u32)>) -> Option<IpcMessage> {
if matches!(
req,
IpcMessage::Status
| IpcMessage::Report
| IpcMessage::FirewallShow
| IpcMessage::FirewallSuggestions { .. }
| IpcMessage::FirewallPending { .. }
| IpcMessage::FirewallSshShow
| IpcMessage::ListFiles
| IpcMessage::Connections
| IpcMessage::ContactId
| IpcMessage::Ping { .. }
| IpcMessage::Netcheck
) {
return None;
}
let uid = peer_cred.map(|(uid, _)| uid);
if uid == Some(0) {
return None;
}
if matches!(req, IpcMessage::SetOperator { .. }) {
return Some(IpcMessage::Error {
message: "permission denied: granting operator access requires root \
(re-run with sudo)"
.to_string(),
});
}
let operator = config::load().ok().and_then(|c| c.operator_uid);
if uid.is_some() && uid == operator {
return None;
}
Some(IpcMessage::Error {
message: "permission denied: this user is not authorized to control rayfish.\n\
Grant access with: sudo ray set-operator <user>"
.to_string(),
})
}
pub(crate) fn set_operator(&self, uid: u32) -> IpcMessage {
let mut app_config = match config::load() {
Ok(c) => c,
Err(e) => {
return IpcMessage::Error {
message: format!("failed to load config: {e}"),
};
}
};
app_config.operator_uid = Some(uid);
if let Err(e) = config::save_settings(&app_config) {
return IpcMessage::Error {
message: format!("failed to save config: {e}"),
};
}
IpcMessage::Ok {
message: format!("operator set to uid {uid}; that user can now run ray without sudo"),
}
}
pub(crate) async fn handle_request(
self: &Arc<Self>,
req: IpcMessage,
peer_cred: Option<(u32, u32)>,
) -> IpcMessage {
if let Some(denied) = Self::check_authorized(&req, peer_cred) {
return denied;
}
match req {
IpcMessage::Create {
mode,
name,
hostname,
transport: _,
} => self.create_network(mode, name, hostname).await,
IpcMessage::Join {
network_key,
name,
hostname,
transport: _,
invite,
coordinator,
auto_accept_firewall,
} => {
self.join_network(
&network_key,
name.as_deref(),
hostname,
invite,
coordinator,
auto_accept_firewall,
)
.await
}
IpcMessage::Leave { name } => self.leave_network(&name).await,
IpcMessage::Nuke { name, force } => self.nuke_network(&name, force).await,
IpcMessage::Status => self.status(),
IpcMessage::Report => self.build_report(peer_cred),
IpcMessage::Up { hostname } => self.activate(hostname).await,
IpcMessage::Down => self.deactivate().await,
IpcMessage::Shutdown => {
self.shutdown_token.cancel();
IpcMessage::Ok {
message: "shutting down".to_string(),
}
}
IpcMessage::FirewallAdd {
direction,
action,
protocol,
port,
peer,
network,
} => self.firewall_add(
direction,
action,
protocol,
port.as_deref(),
peer.as_deref(),
network.as_deref(),
),
IpcMessage::FirewallRemove { index } => self.firewall_remove(index),
IpcMessage::FirewallShow => self.firewall_show(),
IpcMessage::FirewallDefault { action } => self.firewall_default(action),
IpcMessage::FirewallReject { enabled } => self.firewall_reject(enabled),
IpcMessage::FirewallSuggest {
network,
suggestions,
} => self.firewall_suggest(&network, suggestions).await,
IpcMessage::FirewallSuggestions { network } => self.firewall_suggestions(&network),
IpcMessage::FirewallPending { network } => self.firewall_pending(&network),
IpcMessage::FirewallAccept { network } => self.firewall_accept(&network),
IpcMessage::FirewallDeny { network } => self.firewall_deny(&network),
IpcMessage::FirewallResolveSuggestions {
network,
accept,
deny,
} => self.firewall_resolve_suggestions(&network, &accept, &deny),
IpcMessage::FirewallAutoAccept { network, enabled } => {
self.firewall_auto_accept(&network, enabled)
}
IpcMessage::FirewallSshSet { enabled } => self.firewall_ssh_set(enabled),
IpcMessage::FirewallSshAllow {
network,
peer,
users,
allow,
} => self.firewall_ssh_allow(&network, &peer, users, allow).await,
IpcMessage::FirewallSshShow => self.firewall_ssh_show(),
IpcMessage::SetHostname { network, hostname } => {
self.set_hostname(&network, &hostname).await
}
IpcMessage::SendFile { path, peer } => self.send_file(&path, &peer).await,
IpcMessage::ListFiles => self.list_files(),
IpcMessage::AcceptFile { id, output } => self.accept_file(id, output, peer_cred).await,
IpcMessage::StartPairing => self.start_pairing(),
IpcMessage::PairWithDevice {
endpoint_id,
secret,
} => self.pair_with_device(endpoint_id, secret).await,
IpcMessage::SetOperator { uid } => self.set_operator(uid),
IpcMessage::InviteCreate {
network,
expires_secs,
hostname,
reusable,
} => {
self.invite_create(&network, expires_secs, hostname, reusable)
.await
}
IpcMessage::InviteList { network } => self.invite_list(&network).await,
IpcMessage::InviteRevoke { network, id } => self.invite_revoke(&network, &id).await,
IpcMessage::Requests { network } => self.list_requests(&network),
IpcMessage::AcceptRequest { network, id } => self.accept_request(&network, &id).await,
IpcMessage::DenyRequest { network, id } => self.deny_request(&network, &id),
IpcMessage::AdminAdd { network, identity } => self.admin_add(&network, &identity).await,
IpcMessage::AdminList { network } => self.admin_list(&network),
IpcMessage::Connect {
contact_id,
hostname,
} => self.connect(&contact_id, hostname).await,
IpcMessage::Connections => self.list_connections(),
IpcMessage::ApproveConnection { id } => self.approve_connection(&id).await,
IpcMessage::ContactId => IpcMessage::ContactIdResponse {
contact_id: self.contact_public.to_string(),
},
IpcMessage::RotateContact => self.rotate_contact().await,
IpcMessage::Ping {
peer,
count,
interval_ms,
} => self.ping(&peer, count, interval_ms).await,
IpcMessage::Netcheck => self.netcheck().await,
other => IpcMessage::Error {
message: format!("unexpected message: {:?}", other),
},
}
}
pub(crate) async fn set_hostname(&self, network: &str, hostname: &str) -> IpcMessage {
use crate::hostname;
if !hostname::is_valid_hostname(hostname) {
return IpcMessage::Error {
message: "invalid hostname (lowercase ASCII, 1-63 chars)".to_string(),
};
}
let (my_ip, is_coord, state, dht_notify) = match self.networks.get(network) {
Some(h) => (
h.my_ip,
h.role.is_coordinator(),
h.state.clone(),
h.dht_notify.clone(),
),
None => {
return IpcMessage::Error {
message: format!("network '{}' not found", network),
};
}
};
let my_identity = self.endpoint.id();
let new_hostname = if is_coord {
let taken = state.read().unwrap().taken_hostnames(my_identity);
let taken_refs: Vec<&str> = taken.iter().map(|s| s.as_str()).collect();
hostname::resolve_collision(hostname, &taken_refs)
} else {
hostname.to_string()
};
if let Ok(mut s) = state.write()
&& let Some(me) = s.members.get_mut(&my_identity)
{
me.hostname = Some(new_hostname.clone());
}
dns::remove_hostname_by_ip(&self.hostname_table, &self.reverse_table, network, my_ip).await;
dns::update_hostname(
&self.hostname_table,
&self.reverse_table,
network,
&new_hostname,
my_ip,
derive_ipv6(&self.identity.local_identity()),
)
.await;
if let Ok(Some(mut net)) = config::load_network(network) {
net.my_hostname = Some(new_hostname.clone());
net.pending_hostname = if is_coord {
None
} else {
Some(new_hostname.clone())
};
let _ = config::save_network(&net);
}
if is_coord {
tracing::info!(
network = %network,
hostname = %new_hostname,
"coordinator renamed self; republishing blob + broadcasting MemberSync"
);
update_snapshot_and_publish(&state, &self.blob_store, &dht_notify).await;
broadcast_member_sync(&self.peers, None).await;
} else {
self.announce_rename_to_peers(network, my_identity, my_ip, &new_hostname)
.await;
}
let dns_name = format!("{}.{}.{}", new_hostname, network, crate::DNS_DOMAIN);
IpcMessage::Ok {
message: format!("hostname set to {} ({})", new_hostname, dns_name),
}
}
async fn announce_rename_to_peers(
&self,
network: &str,
my_identity: EndpointId,
my_ip: Ipv4Addr,
new_hostname: &str,
) {
let peers = self.peers.peers_for_network_with_conn(network);
tracing::info!(
network = %network,
hostname = %new_hostname,
connected_peers = peers.len(),
"member rename queued as pending intent; sending MeshHello to connected peers"
);
let mut sent = 0usize;
for (_peer_id, _peer_ip, conn) in &peers {
if let Ok((mut send, _recv)) = conn.open_bi().await {
let msg = ControlMsg::MeshHello {
identity: my_identity,
ip: my_ip,
hostname: Some(new_hostname.to_string()),
device_cert: self.device_cert.clone(),
};
if control::send_msg(&mut send, &msg).await.is_ok() {
sent += 1;
}
}
}
tracing::debug!(
network = %network,
hostname = %new_hostname,
sent,
connected_peers = peers.len(),
"fast-path rename MeshHello delivered; drain backstop covers the rest"
);
}
pub(crate) fn resolve_short_id_any_network(&self, short: &str) -> Option<EndpointId> {
if short == "self" {
return Some(self.endpoint.id());
}
for entry in self.networks.iter() {
let state = entry.value().state.read().unwrap();
if let Some(m) = state
.members
.all()
.iter()
.find(|m| m.identity.to_string().starts_with(short))
{
return Some(m.identity);
}
}
None
}
#[allow(clippy::result_large_err)]
pub(crate) fn coordinator_handle(
&self,
network: &str,
) -> std::result::Result<(EndpointId, Arc<tokio::sync::Mutex<()>>), IpcMessage> {
let Some(handle) = self.networks.get(network) else {
return Err(IpcMessage::Error {
message: format!("network '{network}' not active"),
});
};
if !handle.role.is_coordinator() {
return Err(IpcMessage::Error {
message: format!("only the coordinator of '{network}' can manage invites/requests"),
});
}
Ok((handle.network_key, handle.invite_lock.clone()))
}
}
fn guess_mime_type(filename: &str) -> String {
mime_guess::from_path(filename)
.first_or_octet_stream()
.to_string()
}
fn format_size(bytes: u64) -> String {
humansize::format_size(bytes, humansize::BINARY)
}
fn collect_recent_logs() -> Vec<(String, Vec<u8>)> {
const MAX_TOTAL: u64 = 3 * 1024 * 1024;
let dir = crate::logdir::log_dir();
let mut entries: Vec<std::path::PathBuf> = match std::fs::read_dir(&dir) {
Ok(rd) => rd
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
p.file_name()
.and_then(|n| n.to_str())
.is_some_and(|n| n.starts_with("rayfish.log") || n == "panic.log")
})
.collect(),
Err(_) => return Vec::new(),
};
entries.sort();
entries.reverse();
let mut out = Vec::new();
let mut total = 0u64;
for path in entries {
let Ok(bytes) = std::fs::read(&path) else {
continue;
};
total += bytes.len() as u64;
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
out.push((format!("logs/{name}"), bytes));
}
if total >= MAX_TOTAL {
break;
}
}
out
}
fn write_bundle(path: &std::path::Path, files: &[(String, Vec<u8>)]) -> std::io::Result<()> {
let file = std::fs::File::create(path)?;
let enc = flate2::write::GzEncoder::new(file, flate2::Compression::default());
let mut builder = tar::Builder::new(enc);
for (name, data) in files {
let mut header = tar::Header::new_gnu();
header.set_size(data.len() as u64);
header.set_mode(0o644);
builder.append_data(&mut header, name, data.as_slice())?;
}
builder.into_inner()?.finish()?;
Ok(())
}
pub async fn run_daemon(token: CancellationToken, stats: Arc<ForwardMetrics>) -> Result<()> {
check_cgnat_conflict()?;
let (daemon, _metrics_server, promote_rx) = build_daemon(token.clone(), stats).await?;
daemon.connect_all_networks().await;
daemon.activate(None).await;
let result = serve_ipc(&daemon, promote_rx, token).await;
daemon.endpoint.close().await;
result
}
fn initial_alpns(app_config: &config::AppConfig) -> Vec<Vec<u8>> {
let mut alpns: Vec<Vec<u8>> = app_config
.networks
.iter()
.filter_map(|net| net.network_public_key.as_ref().map(transport::network_alpn))
.collect();
alpns.push(iroh_blobs::protocol::ALPN.to_vec());
alpns.push(transport::FILES_ALPN.to_vec());
alpns.push(PAIR_ALPN.to_vec());
alpns.push(transport::CONNECT_ALPN.to_vec());
alpns
}
async fn build_daemon(
token: CancellationToken,
stats: Arc<ForwardMetrics>,
) -> Result<(
Arc<DaemonState>,
Option<iroh_metrics::service::MetricsServer>,
mpsc::Receiver<String>,
)> {
config::migrate_location();
let key = identity::load_or_create()?;
let public_key = key.public();
let device_cert = identity::load_device_cert()?;
if let Some(ref cert) = device_cert {
tracing::info!(user = %cert.user_identity.fmt_short(), "loaded device certificate");
}
let collision_index = identity::load_collision_index()?;
let identity = IrohIdentityProvider::new(public_key, collision_index);
let my_ip = identity.local_ip();
forward::init_ssh_nat(
my_ip,
derive_ipv6(&identity.local_identity()),
crate::ssh::SSH_LISTEN_PORT,
);
let mut app_config = config::load()?;
dht::set_discovery_override(&app_config.discovery_dns);
let contact_public = config::contact_secret(&mut app_config).public();
if let Err(e) = config::save_settings(&app_config) {
tracing::warn!(error = %e, "failed to persist contact key");
}
let alpns = initial_alpns(&app_config);
let use_tor = app_config
.networks
.iter()
.any(|net| net.transport.as_ref().is_some_and(|t| t.is_tor()));
let ep = transport::create_endpoint_with_alpns(
key.clone(),
alpns,
use_tor,
&app_config.relay,
&app_config.discovery_dns,
)
.await?;
let blobs_dir = config::config_dir()?.join("blobs");
std::fs::create_dir_all(&blobs_dir)?;
let blob_store = FsStore::load(&blobs_dir)
.await
.context("failed to open blob store")?;
let blobs_proto = BlobsProtocol::new(&blob_store, None);
let my_ipv6 = derive_ipv6(&identity.local_identity());
let (tun_reader, tun_writer, tun_name) = tun::create(my_ip, my_ipv6)
.await
.context("failed to create TUN device")?;
let peers = match audit::AuditLog::open() {
Ok(log) => PeerTable::with_audit(Arc::new(log)),
Err(e) => {
tracing::warn!(error = %e, "failed to open audit log; peer events will not be audited");
PeerTable::new()
}
};
let fw_config = firewall::load_firewall().unwrap_or_else(|e| {
tracing::warn!(error = %e, "failed to load firewall config, using defaults");
firewall::FirewallConfig::default()
});
let shared_firewall = SharedFirewall::new(fw_config);
shared_firewall.clone().spawn_evictor(token.clone());
let active = Arc::new(AtomicBool::new(false));
let (tun_tx, tun_rx) = mpsc::channel::<Bytes>(256);
forward::spawn_tun_writer(tun_writer, tun_rx, active.clone());
let device_user_map = peers::DeviceUserMap::new();
let hostname_table = dns::new_hostname_table();
let reverse_table = dns::new_reverse_table();
let dns_resolver = std::sync::Arc::new(crate::dns_resolver::Resolver::new(
hostname_table.clone(),
reverse_table.clone(),
));
tokio::spawn(forward::run_mesh(
tun_reader,
peers.clone(),
shared_firewall.clone(),
token.clone(),
stats.clone(),
dns_resolver.clone(),
tun_tx.clone(),
));
let mdns_enabled = app_config.mdns_enabled;
if mdns_enabled {
spawn_mdns_discovery(&ep, token.clone());
} else {
tracing::info!("mDNS discovery disabled");
}
let pairing_secret: Arc<std::sync::Mutex<Option<[u8; 32]>>> =
Arc::new(std::sync::Mutex::new(None));
let protocol_router = Arc::new(ProtocolRouter::new(
blobs_proto,
key.clone(),
pairing_secret.clone(),
));
let (promote_tx, promote_rx) = mpsc::channel::<String>(16);
let daemon = Arc::new(DaemonState {
endpoint: ep,
identity,
peers,
stats: stats.clone(),
start: Instant::now(),
tun_tx,
networks: Arc::new(DashMap::new()),
shutdown_token: token.clone(),
blob_store,
firewall: shared_firewall,
protocol_router: protocol_router.clone(),
hostname_table,
reverse_table,
mdns_enabled,
tun_name,
pairing_secret,
device_cert,
device_user_map,
contact_public,
active: active.clone(),
dns_configurator: Arc::new(std::sync::Mutex::new(None)),
resolver: dns_resolver.clone(),
dns_reassert_token: std::sync::Mutex::new(None),
ssh_authz: crate::ssh::new_authz(),
ssh_token: std::sync::Mutex::new(None),
promote_tx,
});
protocol_router.spawn_accept_loop(daemon.endpoint.clone(), token.clone());
if let Ok(pkarr_client) = dht::create_pkarr_client(&daemon.endpoint) {
spawn_contact_publisher(
pkarr_client,
daemon.endpoint.id(),
token.clone(),
);
}
let metrics_server =
spawn_metrics_server(stats, daemon.peers.clone(), &daemon.endpoint, token).await;
tracing::info!(ip = %my_ip, id = %daemon.endpoint.id().fmt_short(), "daemon started");
Ok((daemon, metrics_server, promote_rx))
}
fn spawn_mdns_discovery(ep: &Endpoint, token: CancellationToken) {
let mdns = match iroh_mdns_address_lookup::MdnsAddressLookup::builder()
.service_name("rayfish")
.advertise(true)
.build(ep.id())
{
Ok(mdns) => mdns,
Err(e) => {
tracing::warn!(error = %e, "failed to start mDNS discovery");
return;
}
};
let Ok(lookups) = ep.address_lookup() else {
return;
};
lookups.add(mdns.clone());
tracing::info!("mDNS discovery enabled (advertising _rayfish._udp.local)");
tokio::spawn(async move {
use futures::StreamExt;
let mut events = mdns.subscribe().await;
loop {
tokio::select! {
_ = token.cancelled() => break,
event = events.next() => match event {
Some(iroh_mdns_address_lookup::DiscoveryEvent::Discovered { endpoint_info, .. }) => {
tracing::info!(
peer = %endpoint_info.endpoint_id.fmt_short(),
"mDNS: peer discovered on LAN"
);
}
Some(iroh_mdns_address_lookup::DiscoveryEvent::Expired { endpoint_id }) => {
tracing::info!(
peer = %endpoint_id.fmt_short(),
"mDNS: peer left LAN"
);
}
None => break,
_ => {}
}
}
}
});
}
async fn spawn_metrics_server(
stats: Arc<ForwardMetrics>,
peers: PeerTable,
endpoint: &Endpoint,
token: CancellationToken,
) -> Option<iroh_metrics::service::MetricsServer> {
let mut registry = iroh_metrics::Registry::default();
registry.register(stats);
let peer_metrics = Arc::new(crate::stats::PeerMetrics::default());
registry.register(peer_metrics.clone());
peer_metrics.spawn_collector(peers, token);
registry.register_all(endpoint.metrics());
let metrics_addr: SocketAddr = ([0, 0, 0, 0], 9090).into();
match iroh_metrics::service::MetricsServer::spawn(metrics_addr, Arc::new(registry)).await {
Ok(server) => {
tracing::info!(addr = %server.local_addr(), "metrics server started");
Some(server)
}
Err(e) => {
tracing::warn!(error = %e, "failed to start metrics server (Prometheus export disabled)");
None
}
}
}
async fn serve_ipc(
daemon: &Arc<DaemonState>,
mut promote_rx: mpsc::Receiver<String>,
token: CancellationToken,
) -> Result<()> {
let socket_path = ipc::socket_path();
if let Some(parent) = socket_path.parent() {
std::fs::create_dir_all(parent)?;
}
if socket_path.exists() {
std::fs::remove_file(&socket_path)?;
}
let listener = UnixListener::bind(&socket_path).context("failed to bind IPC socket")?;
set_socket_permissions(&socket_path);
tracing::info!(path = %socket_path.display(), "IPC socket listening");
loop {
tokio::select! {
_ = token.cancelled() => {
tracing::info!("daemon shutting down");
daemon.deactivate().await;
let _ = std::fs::remove_file(&socket_path);
return Ok(());
}
Some(net) = promote_rx.recv() => {
daemon.promote_to_coordinator(&net).await;
}
result = listener.accept() => match result {
Ok((stream, _)) => {
let daemon = daemon.clone();
tokio::spawn(async move {
if let Err(e) = handle_ipc_client(stream, &daemon).await {
tracing::debug!(error = %e, "IPC client error");
}
});
}
Err(e) => tracing::warn!(error = %e, "IPC accept error"),
}
}
}
}
fn set_socket_permissions(path: &std::path::Path) {
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
if let Ok(c_path) = CString::new(path.as_os_str().as_bytes()) {
unsafe { libc::chmod(c_path.as_ptr(), 0o666) };
tracing::info!("IPC socket mode 0666 (per-request authorization via peer creds)");
}
}
async fn handle_ipc_client(stream: UnixStream, daemon: &Arc<DaemonState>) -> Result<()> {
let peer_cred = stream.peer_cred().ok().map(|c| (c.uid(), c.gid()));
let mut framed = ipc::framed(stream);
let req = ipc::recv(&mut framed).await?;
let resp = daemon.handle_request(req, peer_cred).await;
ipc::send(&mut framed, resp).await?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn spawn_network_publisher(
client: PkarrRelayClient,
net_secret_key: SecretKey,
state: SharedNetworkState,
endpoint_id: EndpointId,
peers: PeerTable,
network_name: String,
notify: Arc<tokio::sync::Notify>,
token: CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
let hash = {
let s = state.read().unwrap();
s.snapshot
.as_ref()
.map(|snap| snap.hash)
.unwrap_or_else(|| {
group_blob_hash(
&s.members,
&s.approved,
&s.suggested_firewall,
s.network_name.as_deref(),
&s.reusable_keys,
)
})
};
let mut seed_peers: Vec<EndpointId> = peers
.peers_for_network(&network_name)
.into_iter()
.map(|(id, _)| id)
.collect();
seed_peers.push(endpoint_id);
seed_peers.sort_by_key(|id| id.to_string());
seed_peers.dedup();
match dht::publish_network(&client, &net_secret_key, &hash, &seed_peers).await {
Ok(()) => tracing::info!(peers = seed_peers.len(), "published network record"),
Err(e) => tracing::warn!(error = %e, "failed to publish network record"),
}
tokio::select! {
_ = token.cancelled() => break,
_ = notify.notified() => {},
_ = tokio::time::sleep(Duration::from_secs(300)) => {},
}
}
})
}
fn spawn_contact_publisher(
client: PkarrRelayClient,
endpoint_id: EndpointId,
token: CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
let secret = config::load().ok().and_then(|c| c.contact_secret_key);
if let Some(secret) = secret {
match dht::publish_contact(&client, &secret, endpoint_id).await {
Ok(()) => {
tracing::debug!(contact = %secret.public().fmt_short(), "published contact record")
}
Err(e) => tracing::warn!(error = %e, "failed to publish contact record"),
}
}
tokio::select! {
_ = token.cancelled() => break,
_ = tokio::time::sleep(Duration::from_secs(150)) => {},
}
}
})
}
#[allow(clippy::too_many_arguments)]
fn spawn_lazy_publisher(
client: PkarrRelayClient,
net_secret_key: SecretKey,
state: SharedNetworkState,
endpoint_id: EndpointId,
peers: PeerTable,
network_name: String,
token: CancellationToken,
) -> JoinHandle<()> {
const LAZY_PUBLISH_INTERVAL: Duration = Duration::from_secs(10);
tokio::spawn(async move {
let mut last_hash: Option<blake3::Hash> = None;
loop {
let hash = {
let s = state.read().unwrap();
s.snapshot
.as_ref()
.map(|snap| snap.hash)
.unwrap_or_else(|| {
group_blob_hash(
&s.members,
&s.approved,
&s.suggested_firewall,
s.network_name.as_deref(),
&s.reusable_keys,
)
})
};
if last_hash != Some(hash) {
let mut seed_peers: Vec<EndpointId> = peers
.peers_for_network(&network_name)
.into_iter()
.map(|(id, _)| id)
.collect();
seed_peers.push(endpoint_id);
seed_peers.sort_by_key(|id| id.to_string());
seed_peers.dedup();
match dht::publish_network(&client, &net_secret_key, &hash, &seed_peers).await {
Ok(()) => {
tracing::info!(
network = %network_name,
"lazy publisher: published network record"
);
last_hash = Some(hash);
}
Err(e) => tracing::warn!(error = %e, "lazy publisher: publish failed"),
}
}
tokio::select! {
_ = token.cancelled() => break,
_ = tokio::time::sleep(LAZY_PUBLISH_INTERVAL) => {},
}
}
})
}
fn apply_suggested_firewall(
firewall: &SharedFirewall,
my_identity: EndpointId,
network_name: &str,
state: &std::sync::RwLock<NetworkState>,
) {
let (suggestions, members): (SuggestedFirewall, Vec<Member>) = {
let s = state.read().unwrap();
(s.suggested_firewall.clone(), s.roster())
};
let my_hostname = members
.iter()
.find(|m| m.identity == my_identity)
.and_then(|m| m.hostname.clone());
let Some(my_hostname) = my_hostname else {
return;
};
let map: HashMap<&str, EndpointId> = members
.iter()
.filter_map(|m| m.hostname.as_deref().map(|h| (h, m.identity)))
.collect();
let resolve = |h: &str| map.get(h).copied();
let rules =
firewall::materialize_suggestions(network_name, &my_hostname, &suggestions, &resolve);
let auto_accept = config::load()
.ok()
.and_then(|c| {
c.networks
.into_iter()
.find(|n| n.name == network_name)
.map(|n| n.auto_accept_firewall)
})
.unwrap_or(false);
if auto_accept {
let config = firewall.replace_network_rules(network_name, rules);
if let Err(e) = firewall::save_firewall(&config) {
tracing::warn!(error = %e, network = network_name, "failed to persist firewall config");
}
state.write().unwrap().pending_suggestions.clear();
tracing::info!(
network = network_name,
"auto-accepted suggested firewall rules"
);
} else {
let installed: Vec<firewall::FirewallRule> = firewall
.get_config()
.rules
.iter()
.filter(|r| matches!(&r.origin, firewall::RuleOrigin::Network(n) if n == network_name))
.cloned()
.collect();
let fresh: Vec<firewall::FirewallRule> = rules
.into_iter()
.filter(|r| !installed.iter().any(|i| i == r))
.collect();
let count = fresh.len();
state.write().unwrap().pending_suggestions = fresh;
tracing::info!(
network = network_name,
count,
"queued suggested firewall rules for review"
);
}
}
async fn resolve_signed(
endpoint: &Endpoint,
net_pubkey: EndpointId,
) -> Option<(blake3::Hash, Vec<EndpointId>)> {
let client = dht::create_pkarr_client(endpoint).ok()?;
dht::resolve_network(&client, net_pubkey).await.ok()
}
async fn fetch_verified_blob(
endpoint: &Endpoint,
blob_store: &FsStore,
peers: &PeerTable,
signed: blake3::Hash,
network_name: &str,
seeds: &[EndpointId],
) -> Option<crate::membership::GroupBlob> {
let blob_hash = iroh_blobs::Hash::from_bytes(*signed.as_bytes());
let mut peer_ids: Vec<EndpointId> = peers
.peers_for_network(network_name)
.into_iter()
.map(|(id, _)| id)
.collect();
peer_ids.extend_from_slice(seeds);
peer_ids.sort_by_key(|id| id.to_string());
peer_ids.dedup();
for pid in &peer_ids {
if let Ok(conn) =
transport::connect_to_peer_with_alpn(endpoint, *pid, iroh_blobs::protocol::ALPN).await
&& blob_store
.remote()
.fetch(conn, HashAndFormat::raw(blob_hash))
.await
.is_ok()
&& let Ok(bytes) = blob_store.blobs().get_bytes(blob_hash).await
&& let Ok(data) = crate::membership::verify_group_blob(&bytes, &signed)
{
return Some(data);
}
}
None
}
#[allow(clippy::too_many_arguments)]
async fn reconverge_and_apply(
endpoint: &Endpoint,
ctx: &MeshCtx,
net_pubkey: EndpointId,
network_name: &str,
state: &SharedNetworkState,
my_identity: EndpointId,
alpn: &[u8],
my_ip: Ipv4Addr,
device_cert: &Option<control::DeviceCert>,
) {
let MeshCtx {
peers,
blob_store,
firewall,
hostname_table,
reverse_table,
..
} = ctx;
let current = state.read().unwrap().snapshot.as_ref().map(|s| s.hash);
let Some((signed, seeds)) = resolve_signed(endpoint, net_pubkey).await else {
tracing::debug!(network = %network_name, "reconverge: signed record unavailable");
return;
};
if crate::membership::trusted_reconverge_hash(current, signed).is_none() {
let roster = state.read().unwrap().roster();
drain_pending_rename(
endpoint,
&roster,
alpn,
network_name,
my_identity,
my_ip,
device_cert,
)
.await;
return;
}
let Some(data) =
fetch_verified_blob(endpoint, blob_store, peers, signed, network_name, &seeds).await
else {
tracing::warn!(network = %network_name, "reconverge: could not fetch verified blob");
return;
};
let tiebroken = crate::membership::resolve_ip_tiebreak(data.members.clone());
if let Err(e) = crate::membership::validate_no_duplicate_ips(&tiebroken) {
tracing::warn!(network = %network_name, error = %e, "roster still has duplicate IPs after tiebreak; applying tiebroken version");
}
let roster = {
let mut s = state.write().unwrap();
s.members = MemberList::from_members(tiebroken);
s.approved = ApprovedList::from_entries(data.approved.clone());
s.suggested_firewall = data.suggested_firewall.clone();
s.refresh_snapshot();
s.roster()
};
apply_roster_to_dns(
&roster,
network_name,
my_identity,
hostname_table,
reverse_table,
)
.await;
apply_suggested_firewall(firewall, my_identity, network_name, state);
drain_pending_rename(
endpoint,
&roster,
alpn,
network_name,
my_identity,
my_ip,
device_cert,
)
.await;
tracing::info!(network = %network_name, "reconverged from signed record");
}
fn coordinator_dial_order(
minter: EndpointId,
members: &[Member],
me: EndpointId,
) -> Vec<EndpointId> {
let mut order = Vec::new();
let is_coord = |id: EndpointId| members.iter().any(|m| m.identity == id && m.is_coordinator);
if minter != me && is_coord(minter) {
order.push(minter);
}
for m in members {
if m.is_coordinator && m.identity != me && !order.contains(&m.identity) {
order.push(m.identity);
}
}
order
}
fn gossip_targets(members: &[Member], me: EndpointId) -> Vec<EndpointId> {
members
.iter()
.filter(|m| m.is_coordinator && m.identity != me)
.map(|m| m.identity)
.collect()
}
fn sender_is_coordinator(state: &SharedNetworkState, peer: EndpointId) -> bool {
state
.read()
.unwrap()
.members
.all()
.iter()
.any(|m| m.identity == peer && m.is_coordinator)
}
async fn gossip_to_coordinators(
peers: &PeerTable,
network: &str,
members: &[Member],
me: EndpointId,
msg: &ControlMsg,
) {
let targets = gossip_targets(members, me);
if targets.is_empty() {
return;
}
for (eid, _ip, conn) in peers.peers_for_network_with_conn(network) {
if !targets.contains(&eid) {
continue;
}
if let Ok((mut send, _)) = conn.open_bi().await {
let _ = control::send_msg(&mut send, msg).await;
}
}
}
#[derive(Clone, Copy, PartialEq, Debug)]
#[allow(dead_code)]
enum DialOutcome {
Welcomed,
Denied,
Unreachable,
}
#[allow(dead_code)]
fn pick_first_welcome(outcomes: &[DialOutcome]) -> (usize, bool) {
for (i, o) in outcomes.iter().enumerate() {
if *o == DialOutcome::Welcomed {
return (i, true);
}
}
(outcomes.len().saturating_sub(1), false)
}
fn persisted_roster(network_name: &str) -> Vec<Member> {
config::load()
.ok()
.and_then(|c| c.networks.into_iter().find(|n| n.name == network_name))
.map(|n| {
n.members
.into_iter()
.map(|m| Member {
identity: m.identity,
ip: m.ip,
is_coordinator: m.is_coordinator,
hostname: m.hostname,
user_identity: None,
device_cert: None,
collision_index: 0,
})
.collect()
})
.unwrap_or_default()
}
fn spawn_group_poller(
client: PkarrRelayClient,
net_pubkey: EndpointId,
state: SharedNetworkState,
endpoint: Endpoint,
ctx: MeshCtx,
network_name: String,
token: CancellationToken,
) -> JoinHandle<()> {
let MeshCtx {
peers,
blob_store,
firewall: fw,
..
} = ctx;
tokio::spawn(async move {
loop {
tokio::select! {
_ = token.cancelled() => break,
_ = tokio::time::sleep(Duration::from_secs(60)) => {},
}
let current_hash = {
let s = state.read().unwrap();
s.snapshot.as_ref().map(|snap| snap.hash)
};
let (remote_hash, _seed_peers) = match dht::resolve_network(&client, net_pubkey).await {
Ok(r) => r,
Err(e) => {
tracing::debug!(error = %e, "group poll failed");
continue;
}
};
if current_hash == Some(remote_hash) {
continue;
}
tracing::info!(old = ?current_hash, new = %remote_hash, "group blob changed");
let blob_hash = iroh_blobs::Hash::from_bytes(*remote_hash.as_bytes());
let peer_ids: Vec<EndpointId> = peers
.peers_for_network(&network_name)
.into_iter()
.map(|(id, _)| id)
.collect();
let mut new_data = None;
for peer_id in &peer_ids {
let conn = match transport::connect_to_peer_with_alpn(
&endpoint,
*peer_id,
iroh_blobs::protocol::ALPN,
)
.await
{
Ok(c) => c,
Err(_) => continue,
};
if blob_store
.remote()
.fetch(conn, HashAndFormat::raw(blob_hash))
.await
.is_err()
{
continue;
}
match blob_store.blobs().get_bytes(blob_hash).await {
Ok(bytes) => match crate::membership::decode_group_blob(&bytes) {
Ok(data) => {
new_data = Some(data);
break;
}
Err(_) => continue,
},
Err(_) => continue,
}
}
let Some(data) = new_data else {
tracing::warn!("could not fetch updated group blob from any peer");
continue;
};
let old_members: Vec<EndpointId> = {
let s = state.read().unwrap();
s.members.all().iter().map(|m| m.identity).collect()
};
let new_member_ids: std::collections::HashSet<EndpointId> =
data.members.iter().map(|m| m.identity).collect();
for old_id in &old_members {
if !new_member_ids.contains(old_id) {
let s = state.read().unwrap();
if let Some(member) = s.members.get(old_id) {
peers.remove(&member.ip, &derive_ipv6(old_id));
tracing::info!(peer = %old_id.fmt_short(), "removed kicked peer");
}
}
}
let my_id = endpoint.id();
if !new_member_ids.contains(&my_id)
&& !data.approved.iter().any(|a| a.identity == my_id)
{
tracing::warn!("we have been removed from the network");
break;
}
{
let mut s = state.write().unwrap();
s.members = MemberList::from_members(data.members.clone());
s.approved = ApprovedList::from_entries(data.approved.clone());
s.suggested_firewall = data.suggested_firewall.clone();
s.refresh_snapshot();
}
apply_suggested_firewall(&fw, endpoint.id(), &network_name, &state);
}
})
}
struct CoordinatorCleanup {
state: SharedNetworkState,
blob_store: FsStore,
dht_notify: Option<Arc<tokio::sync::Notify>>,
hostname_table: dns::HostnameTable,
reverse_table: dns::ReverseLookupTable,
device_user_map: peers::DeviceUserMap,
network_name: String,
}
fn spawn_peer_cleanup(
mut disconnect_rx: mpsc::Receiver<forward::DisconnectEvent>,
peers: PeerTable,
token: CancellationToken,
coordinator: Option<CoordinatorCleanup>,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::select! {
_ = token.cancelled() => return,
event = disconnect_rx.recv() => {
match event {
Some(ev) => {
tracing::info!(peer = %ev.endpoint_id.fmt_short(), ip = %ev.ip, network = %ev.network, intentional = ev.intentional, "removing dead peer");
peers.remove_peer_from_network(&ev.ip, &ev.ipv6, &ev.network);
if ev.intentional && let Some(c) = &coordinator {
let member_id = c.device_user_map.resolve(&ev.endpoint_id);
c.state.write().unwrap().members.remove(&member_id);
dns::remove_hostname_by_ip(
&c.hostname_table,
&c.reverse_table,
&c.network_name,
ev.ip,
)
.await;
update_snapshot_and_publish(&c.state, &c.blob_store, &c.dht_notify).await;
broadcast_member_sync(&peers, None).await;
tracing::info!(peer = %member_id.fmt_short(), "pruned member after leave");
}
}
None => return,
}
}
}
}
})
}
#[allow(clippy::too_many_arguments)]
fn spawn_coordinator_control_reader(
conn: Connection,
remote_id: EndpointId,
peer_ip: Ipv4Addr,
network_name: String,
state: SharedNetworkState,
ctx: MeshCtx,
dht_notify: Option<Arc<tokio::sync::Notify>>,
token: CancellationToken,
invite_lock: Arc<tokio::sync::Mutex<()>>,
pending_pongs: Arc<DashMap<u64, tokio::sync::oneshot::Sender<()>>>,
) {
let MeshCtx {
peers,
blob_store,
hostname_table,
reverse_table,
device_user_map,
..
} = ctx;
tokio::spawn(async move {
let mut gate = crate::ratelimit::ControlGate::new();
loop {
let accepted = tokio::select! {
_ = token.cancelled() => return,
r = conn.accept_bi() => r,
};
let mut recv = match accepted {
Ok((_send, recv)) => recv,
Err(_) => return, };
let msg = match control::recv_msg(&mut recv).await {
Ok(m) => m,
Err(_) => continue,
};
match gate.check() {
crate::ratelimit::Verdict::Allow => {}
crate::ratelimit::Verdict::Drop => continue,
crate::ratelimit::Verdict::Close => {
tracing::warn!(peer = %remote_id.fmt_short(), "control-plane flood; closing connection");
conn.close(VarInt::from_u32(forward::ABUSE_CODE), b"control flood");
return;
}
}
match msg {
ControlMsg::InviteShare {
id,
secret_hash,
expires,
} => {
if !sender_is_coordinator(&state, remote_id) {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteShare from non-coordinator");
continue;
}
let Ok(hash) = String::from_utf8(secret_hash) else {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteShare with non-utf8 hash");
continue;
};
let _guard = invite_lock.lock().await;
if let Ok(mut store) = crate::invite::InviteStore::load(&network_name) {
let _ = store.record_shared(id, hash, expires);
}
continue;
}
ControlMsg::InviteUsed { secret_hash } => {
if !sender_is_coordinator(&state, remote_id) {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteUsed from non-coordinator");
continue;
}
let Ok(hash) = String::from_utf8(secret_hash) else {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteUsed with non-utf8 hash");
continue;
};
let _guard = invite_lock.lock().await;
if let Ok(mut store) = crate::invite::InviteStore::load(&network_name) {
let _ = store.burn_by_hash(&hash);
}
continue;
}
ControlMsg::Ping { nonce } => {
respond_pong(&conn, nonce).await;
continue;
}
ControlMsg::Pong { nonce } => {
if let Some((_, tx)) = pending_pongs.remove(&nonce) {
let _ = tx.send(());
}
continue;
}
_ => {}
}
let ControlMsg::MeshHello {
hostname,
device_cert,
..
} = msg
else {
continue;
};
if let Some(ref cert) = device_cert
&& cert.verify()
&& cert.device_key == remote_id
{
{
let mut s = state.write().unwrap();
if let Some(m) = s.members.get_mut(&remote_id) {
m.user_identity = Some(cert.user_identity);
m.device_cert = Some(cert.clone());
}
}
device_user_map.insert(remote_id, cert.user_identity);
}
let Some(desired) = hostname else { continue };
tracing::info!(
network = %network_name,
peer = %remote_id.fmt_short(),
desired = %desired,
"coordinator received MeshHello hostname"
);
let (final_hostname, changed) = {
let s = state.read().unwrap();
let taken: Vec<String> = s
.members
.all()
.iter()
.filter(|m| m.identity != remote_id)
.filter_map(|m| m.hostname.clone())
.collect();
let taken_refs: Vec<&str> = taken.iter().map(|s| s.as_str()).collect();
let final_hostname = crate::hostname::resolve_collision(&desired, &taken_refs);
let old = s
.members
.all()
.iter()
.find(|m| m.identity == remote_id)
.and_then(|m| m.hostname.clone());
let changed = old.as_deref() != Some(final_hostname.as_str());
(final_hostname, changed)
};
if changed {
let mut s = state.write().unwrap();
if let Some(m) = s.members.get_mut(&remote_id) {
m.hostname = Some(final_hostname.clone());
}
}
dns::remove_hostname_by_ip(&hostname_table, &reverse_table, &network_name, peer_ip)
.await;
let ipv6 = derive_ipv6(&remote_id);
dns::update_hostname(
&hostname_table,
&reverse_table,
&network_name,
&final_hostname,
peer_ip,
ipv6,
)
.await;
if changed {
tracing::info!(peer = %remote_id.fmt_short(), network = %network_name, hostname = %final_hostname, "peer hostname changed; republishing blob + broadcasting MemberSync");
update_snapshot_and_publish(&state, &blob_store, &dht_notify).await;
broadcast_member_sync(&peers, None).await;
} else {
tracing::debug!(peer = %remote_id.fmt_short(), network = %network_name, hostname = %final_hostname, "peer hostname unchanged; no republish (idempotent MeshHello)");
}
}
});
}
fn choose_path_index(classes: &[(ipc::ConnType, bool)]) -> Option<usize> {
if let Some(i) = classes.iter().position(|(_, selected)| *selected) {
return Some(i);
}
for want in [
ipc::ConnType::Direct,
ipc::ConnType::Relay,
ipc::ConnType::Tor,
] {
if let Some(i) = classes.iter().position(|(ct, _)| *ct == want) {
return Some(i);
}
}
(!classes.is_empty()).then_some(0)
}
fn rename_satisfied(pending: &str, blob: Option<&str>) -> bool {
match blob {
Some(name) if name == pending => true,
Some(name) => name
.strip_prefix(pending)
.and_then(|rest| rest.strip_prefix('-'))
.is_some_and(|digits| !digits.is_empty() && digits.bytes().all(|b| b.is_ascii_digit())),
None => false,
}
}
async fn drain_pending_rename(
endpoint: &Endpoint,
roster: &[Member],
alpn: &[u8],
network_name: &str,
my_identity: EndpointId,
my_ip: Ipv4Addr,
device_cert: &Option<control::DeviceCert>,
) {
let Some(pending) = (match config::load_network(network_name) {
Ok(Some(net)) => net.pending_hostname,
_ => None,
}) else {
return;
};
let coordinators: Vec<&Member> = roster
.iter()
.filter(|m| m.is_coordinator && m.identity != my_identity)
.collect();
tracing::info!(
network = %network_name,
hostname = %pending,
coordinators = coordinators.len(),
"pending rename outstanding; delivering MeshHello to coordinator set"
);
if coordinators.is_empty() {
tracing::warn!(
network = %network_name,
hostname = %pending,
"no other coordinator in roster to deliver pending rename to; will retry on next reconverge/backstop"
);
}
for m in coordinators {
match transport::connect_to_peer_with_alpn(endpoint, m.identity, alpn).await {
Ok(conn) => {
if let Ok((mut send, _recv)) = conn.open_bi().await {
let _ = control::send_msg(
&mut send,
&ControlMsg::MeshHello {
identity: my_identity,
ip: my_ip,
hostname: Some(pending.clone()),
device_cert: device_cert.clone(),
},
)
.await;
tracing::info!(
network = %network_name,
coordinator = %m.identity.fmt_short(),
hostname = %pending,
"re-sent pending rename to coordinator"
);
}
}
Err(e) => {
tracing::debug!(
network = %network_name,
coordinator = %m.identity.fmt_short(),
error = %e,
"could not reach coordinator to deliver pending rename; will retry"
);
}
}
}
}
fn has_pending_hostname(network_name: &str) -> bool {
matches!(
config::load_network(network_name),
Ok(Some(net)) if net.pending_hostname.is_some()
)
}
fn outgoing_hostname(network_name: &str) -> Option<String> {
match config::load_network(network_name) {
Ok(Some(net)) => net.pending_hostname.or(net.my_hostname),
_ => None,
}
}
async fn apply_roster_to_dns(
members: &[Member],
network_name: &str,
my_identity: EndpointId,
hostname_table: &dns::HostnameTable,
reverse_table: &dns::ReverseLookupTable,
) {
let mut entries: Vec<(String, Ipv4Addr, std::net::Ipv6Addr)> = members
.iter()
.filter_map(|m| {
m.hostname
.as_ref()
.map(|h| (h.clone(), m.ip, derive_ipv6(&m.identity)))
})
.collect();
let blob_self = members
.iter()
.find(|m| m.identity == my_identity)
.and_then(|m| m.hostname.clone());
if let Ok(Some(mut net)) = config::load_network(network_name) {
match net.pending_hostname.clone() {
Some(pending) if !rename_satisfied(&pending, blob_self.as_deref()) => {
tracing::info!(
network = %network_name,
pending = %pending,
blob = blob_self.as_deref().unwrap_or("<none>"),
"rename still unconfirmed by signed blob; holding local name and keeping it queued for delivery"
);
if let Some(me) = members.iter().find(|m| m.identity == my_identity) {
let v6 = derive_ipv6(&my_identity);
entries.retain(|(_, v4, _)| *v4 != me.ip);
entries.push((pending.clone(), me.ip, v6));
}
if net.my_hostname.as_deref() != Some(pending.as_str()) {
net.my_hostname = Some(pending);
let _ = config::save_network(&net);
}
}
pending => {
let mut dirty = false;
if let Some(p) = &pending {
tracing::info!(
network = %network_name,
requested = %p,
confirmed = blob_self.as_deref().unwrap_or("<none>"),
"rename confirmed by signed blob; clearing pending intent"
);
net.pending_hostname = None;
dirty = true;
}
if let Some(mine) = blob_self.clone()
&& net.my_hostname.as_deref() != Some(mine.as_str())
{
net.my_hostname = Some(mine);
dirty = true;
}
if dirty {
let _ = config::save_network(&net);
}
}
}
}
dns::sync_network_hostnames(hostname_table, reverse_table, network_name, &entries).await;
}
fn now_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
async fn update_snapshot_and_publish(
state: &SharedNetworkState,
blob_store: &FsStore,
dht_notify: &Option<Arc<tokio::sync::Notify>>,
) {
let snap_bytes = {
let mut s = state.write().unwrap();
s.refresh_snapshot();
s.snapshot.as_ref().map(|snap| snap.msgpack_bytes.clone())
};
if let Some(bytes) = snap_bytes {
let _ = blob_store.blobs().add_slice(&bytes).await;
}
if let Some(notify) = dht_notify {
notify.notify_one();
}
}
#[allow(clippy::too_many_arguments)]
enum JoinResult {
Joined(SharedNetworkState),
Pending,
}
pub(crate) enum TryJoin {
Joined(IpcMessage),
Pending,
}
struct JoinParams {
my_hostname: Option<String>,
net_pubkey: EndpointId,
device_cert: Option<control::DeviceCert>,
invite_secret: Option<Vec<u8>>,
suggested_firewall: SuggestedFirewall,
reusable_keys: BTreeMap<String, crate::membership::ReusableKey>,
auto_accept_firewall: bool,
initial: bool,
}
#[allow(clippy::too_many_arguments)]
async fn join_mesh_shared(
initial_conn: Connection,
ep: &Endpoint,
network_name: &str,
alpn: &[u8],
ctx: MeshCtx,
params: JoinParams,
disconnect_tx: mpsc::Sender<forward::DisconnectEvent>,
token: CancellationToken,
promote_tx: mpsc::Sender<String>,
invite_lock: Arc<tokio::sync::Mutex<()>>,
pending_pongs: Arc<DashMap<u64, tokio::sync::oneshot::Sender<()>>>,
) -> Result<JoinResult> {
let worker_ctx = ctx.clone();
let MeshCtx {
identity,
peers,
blob_store,
firewall,
..
} = ctx;
let JoinParams {
my_hostname,
net_pubkey,
device_cert,
invite_secret,
suggested_firewall,
reusable_keys,
auto_accept_firewall,
initial,
} = params;
let my_identity = identity.local_identity();
let my_ip = identity.local_ip();
let (members, approved) = if initial {
let (mut send, mut recv) = initial_conn
.open_bi()
.await
.context("open join control stream")?;
control::send_msg(
&mut send,
&ControlMsg::JoinRequest {
invite_secret,
hostname: my_hostname.clone(),
device_cert: device_cert.clone(),
},
)
.await
.context("send join request")?;
let msg = tokio::time::timeout(Duration::from_secs(30), control::recv_msg(&mut recv))
.await
.context("timeout awaiting join response")??;
match msg {
ControlMsg::Welcome { members, approved } => {
tracing::info!(network = %network_name, "welcomed to network");
if let Some(existing) = members
.iter()
.find(|m| m.ip == my_ip && m.identity != my_identity)
{
anyhow::bail!(
"IP collision: {} is already assigned to {}",
my_ip,
existing.identity
);
}
(members, approved)
}
ControlMsg::JoinPending => {
tracing::info!(network = %network_name, "join pending operator approval");
return Ok(JoinResult::Pending);
}
ControlMsg::JoinDenied { reason } => {
anyhow::bail!("join denied: {reason}");
}
other => {
anyhow::bail!("expected Welcome or JoinPending, got {other:?}");
}
}
} else {
let (_send, mut recv) = initial_conn
.accept_bi()
.await
.context("accept control stream")?;
let msg = control::recv_msg(&mut recv).await?;
match msg {
ControlMsg::Welcome { members, approved } => {
tracing::info!(network = %network_name, "welcomed to network");
(members, approved)
}
ControlMsg::JoinApproved { your_ip, members } => {
tracing::info!(ip = %your_ip, network = %network_name, "joined network (legacy)");
(members, vec![])
}
ControlMsg::MemberSync => {
tracing::info!(network = %network_name, "reconnected via peer; reconverging from signed record");
match resolve_signed(ep, net_pubkey).await {
Some((signed, seeds)) => {
match fetch_verified_blob(
ep,
&blob_store,
&peers,
signed,
network_name,
&seeds,
)
.await
{
Some(data) => (data.members, data.approved),
None => (persisted_roster(network_name), vec![]),
}
}
None => (persisted_roster(network_name), vec![]),
}
}
ControlMsg::JoinDenied { reason } => {
anyhow::bail!("join denied: {reason}");
}
other => {
anyhow::bail!("expected Welcome or MemberSync, got {other:?}");
}
}
};
let member_entries = to_member_entries(members.iter());
let approved_config = to_approved_entries(approved.iter());
let persisted_hostname = members
.iter()
.find(|m| m.identity == my_identity)
.and_then(|m| m.hostname.clone())
.or(my_hostname.clone());
let (direct, pending_hostname, ssh_allow) = config::load_network(network_name)?
.map(|n| (n.direct, n.pending_hostname, n.ssh_allow))
.unwrap_or((false, None, vec![]));
config::save_network(&config::NetworkConfig {
name: network_name.to_string(),
group_mode: GroupMode::Restricted,
my_ip: Some(my_ip),
my_hostname: persisted_hostname,
pending_hostname,
members: member_entries,
approved: approved_config,
network_secret_key: None,
network_public_key: Some(net_pubkey),
transport: None,
auto_accept_firewall,
admins: vec![],
direct,
ssh_allow,
})?;
if !initial {
let (mut send, _recv) = initial_conn.open_bi().await?;
control::send_msg(
&mut send,
&ControlMsg::MeshHello {
identity: my_identity,
ip: my_ip,
hostname: outgoing_hostname(network_name),
device_cert: device_cert.clone(),
},
)
.await?;
}
let remote_id = initial_conn.remote_id();
let remote_ip = identity.derive_ip(&remote_id);
crate::spawn_path_logger(initial_conn.clone(), remote_id.fmt_short().to_string());
let remote_ipv6 = derive_ipv6(&remote_id);
peers.add(
remote_ip,
remote_ipv6,
initial_conn.clone(),
remote_id,
network_name,
);
forward::spawn_peer_reader(
initial_conn.clone(),
remote_id,
remote_ip,
remote_ipv6,
network_name.to_string(),
worker_ctx.forward_ctx(disconnect_tx.clone(), token.clone()),
);
for member in &members {
if member.identity == my_identity || member.identity == initial_conn.remote_id() {
continue;
}
match transport::connect_to_peer_with_alpn(ep, member.identity, alpn).await {
Ok(conn) => {
let (mut send, _recv) = conn.open_bi().await?;
control::send_msg(
&mut send,
&ControlMsg::MeshHello {
identity: my_identity,
ip: my_ip,
hostname: outgoing_hostname(network_name),
device_cert: device_cert.clone(),
},
)
.await?;
let member_ipv6 = derive_ipv6(&member.identity);
peers.add(
member.ip,
member_ipv6,
conn.clone(),
member.identity,
network_name,
);
forward::spawn_peer_reader(
conn,
member.identity,
member.ip,
member_ipv6,
network_name.to_string(),
worker_ctx.forward_ctx(disconnect_tx.clone(), token.clone()),
);
tracing::info!(peer_ip = %member.ip, "connected to mesh peer");
}
Err(e) => {
tracing::warn!(peer_ip = %member.ip, error = %e, "mesh peer unavailable");
}
}
}
let live_state = {
let mut ns = NetworkState {
members: MemberList::from_members(members.clone()),
approved: ApprovedList::from_entries(approved),
snapshot: None,
network_secret_key: None,
network_public_key: net_pubkey,
network_name: Some(network_name.to_string()),
mode: GroupMode::Restricted,
suggested_firewall,
reusable_keys,
pending_suggestions: Vec::new(),
pending: HashMap::new(),
};
ns.refresh_snapshot();
if let Some(snap) = &ns.snapshot {
let _ = blob_store.blobs().add_slice(&snap.msgpack_bytes).await;
}
Arc::new(std::sync::RwLock::new(ns))
};
apply_suggested_firewall(&firewall, my_identity, network_name, &live_state);
let reconverge_notify = Arc::new(tokio::sync::Notify::new());
tokio::spawn({
let notify = reconverge_notify.clone();
let token = token.clone();
let live_state = live_state.clone();
let network_name = network_name.to_string();
let ctx_w = worker_ctx;
let endpoint_w = ep.clone();
let my_identity_w = my_identity;
let net_pubkey_w = net_pubkey;
let alpn_w = alpn.to_vec();
let my_ip_w = my_ip;
let device_cert_w = device_cert.clone();
async move {
let mut tick = tokio::time::interval(std::time::Duration::from_secs(30));
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = token.cancelled() => return,
_ = notify.notified() => {}
_ = tick.tick() => {
if !has_pending_hostname(&network_name) {
continue;
}
tracing::debug!(
network = %network_name,
"backstop tick: pending rename outstanding, reconverging to retry delivery"
);
}
}
tokio::select! {
_ = token.cancelled() => return,
_ = tokio::time::sleep(std::time::Duration::from_millis(300)) => {}
}
reconverge_and_apply(
&endpoint_w, &ctx_w, net_pubkey_w,
&network_name, &live_state, my_identity_w,
&alpn_w, my_ip_w, &device_cert_w,
).await;
}
}
});
tokio::spawn({
let initial_conn = initial_conn.clone();
let token = token.clone();
let live_state = live_state.clone();
let network_name = network_name.to_string();
let peers_c = peers.clone();
let endpoint_c = ep.clone();
let my_identity_c = my_identity;
let net_pubkey_c = net_pubkey;
let promote_tx = promote_tx.clone();
let invite_lock = invite_lock.clone();
let reconverge_notify = reconverge_notify.clone();
let pending_pongs = pending_pongs.clone();
async move {
let mut gate = crate::ratelimit::ControlGate::new();
loop {
tokio::select! {
_ = token.cancelled() => return,
result = initial_conn.accept_bi() => {
match result {
Ok((_send, mut recv)) => {
let msg = match control::recv_msg(&mut recv).await {
Ok(m) => m,
Err(_) => continue,
};
match gate.check() {
crate::ratelimit::Verdict::Allow => {}
crate::ratelimit::Verdict::Drop => continue,
crate::ratelimit::Verdict::Close => {
tracing::warn!(peer = %remote_id.fmt_short(), "control-plane flood; closing connection");
initial_conn.close(VarInt::from_u32(forward::ABUSE_CODE), b"control flood");
return;
}
}
match msg {
ControlMsg::MemberApproved { identity, ip, hostname, .. } => {
let entry = ApprovedEntry { identity, ip, hostname, user_identity: None, device_cert: None, collision_index: 0 };
let mut s = live_state.write().unwrap();
let members = s.members.clone();
let _ = s.approved.approve(entry, &members);
}
ControlMsg::MemberSync => {
reconverge_notify.notify_one();
}
ControlMsg::BlobUpdated => {
reconverge_notify.notify_one();
}
ControlMsg::AdminGrant { network_pubkey, secret_key } => {
if network_pubkey != net_pubkey_c {
tracing::warn!(
peer = %remote_id.fmt_short(),
"admin grant for a different network; ignoring"
);
continue;
}
if !admin_grant_key_valid(secret_key, net_pubkey_c) {
tracing::warn!(
peer = %remote_id.fmt_short(),
"admin grant key does not match network pubkey; ignoring"
);
continue;
}
let key = SecretKey::from(secret_key);
if let Ok(Some(mut net)) = config::load_network(&network_name) {
net.network_secret_key = Some(key.clone());
let _ = config::save_network(&net);
}
let endpoint_id = endpoint_c.id();
{
let mut s = live_state.write().unwrap();
s.network_secret_key = Some(key.clone());
if let Some(m) = s.members.get_mut(&my_identity_c) {
m.is_coordinator = true;
}
s.refresh_snapshot();
}
if let Ok(client) = dht::create_pkarr_client(&endpoint_c) {
spawn_lazy_publisher(
client,
key,
live_state.clone(),
endpoint_id,
peers_c.clone(),
network_name.clone(),
token.clone(),
);
tracing::info!(
network = %network_name,
"promoted to co-coordinator; lazy publisher started"
);
}
let _ = promote_tx.send(network_name.clone()).await;
}
ControlMsg::InviteShare { id, secret_hash, expires } => {
if !sender_is_coordinator(&live_state, remote_id) {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteShare from non-coordinator");
continue;
}
let Ok(hash) = String::from_utf8(secret_hash) else {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteShare with non-utf8 hash");
continue;
};
let _guard = invite_lock.lock().await;
if let Ok(mut store) = crate::invite::InviteStore::load(&network_name) {
let _ = store.record_shared(id, hash, expires);
}
}
ControlMsg::InviteUsed { secret_hash } => {
if !sender_is_coordinator(&live_state, remote_id) {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteUsed from non-coordinator");
continue;
}
let Ok(hash) = String::from_utf8(secret_hash) else {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteUsed with non-utf8 hash");
continue;
};
let _guard = invite_lock.lock().await;
if let Ok(mut store) = crate::invite::InviteStore::load(&network_name) {
let _ = store.burn_by_hash(&hash);
}
}
ControlMsg::Ping { nonce } => {
respond_pong(&initial_conn, nonce).await;
}
ControlMsg::Pong { nonce } => {
if let Some((_, tx)) = pending_pongs.remove(&nonce) {
let _ = tx.send(());
}
}
_ => {}
}
}
Err(_) => return,
}
}
}
}
}
});
Ok(JoinResult::Joined(live_state))
}
#[allow(clippy::too_many_arguments)]
fn spawn_reconnect_loop(
mut disconnect_rx: mpsc::Receiver<forward::DisconnectEvent>,
ep: Endpoint,
alpn: Vec<u8>,
network_name: String,
my_identity: EndpointId,
my_ip: Ipv4Addr,
ctx: MeshCtx,
disconnect_tx: mpsc::Sender<forward::DisconnectEvent>,
token: CancellationToken,
device_cert: Option<control::DeviceCert>,
) -> JoinHandle<()> {
let MeshCtx {
peers,
tun_tx,
stats,
firewall,
device_user_map,
..
} = ctx;
use tracing::Instrument as _;
let span = tracing::info_span!("reconnect", net = %network_name);
let reconnect_loop = async move {
loop {
let event = tokio::select! {
_ = token.cancelled() => return,
event = disconnect_rx.recv() => match event {
Some(ev) => ev,
None => return,
},
};
let peer_id = event.endpoint_id;
let peer_ip = event.ip;
let peer_ipv6 = event.ipv6;
peers.remove_peer_from_network(&peer_ip, &peer_ipv6, &event.network);
if event.intentional {
tracing::info!(peer = %peer_id.fmt_short(), ip = %peer_ip, "peer left, not reconnecting");
continue;
}
tracing::info!(peer = %peer_id.fmt_short(), ip = %peer_ip, "peer disconnected, will reconnect");
let ep = ep.clone();
let alpn = alpn.clone();
let network_name = network_name.clone();
let peers = peers.clone();
let tun_tx = tun_tx.clone();
let disconnect_tx = disconnect_tx.clone();
let token = token.clone();
let stats = stats.clone();
let firewall = firewall.clone();
let device_cert = device_cert.clone();
let device_user_map = device_user_map.clone();
tokio::spawn(async move {
let mut backoff = BACKOFF_INITIAL;
loop {
if token.is_cancelled() {
return;
}
tracing::info!(peer = %peer_id.fmt_short(), secs = backoff.as_secs(), "reconnecting in");
tokio::select! {
_ = token.cancelled() => return,
_ = tokio::time::sleep(backoff) => {}
}
backoff = (backoff * 2).min(BACKOFF_MAX);
match transport::connect_to_peer_with_alpn(&ep, peer_id, &alpn).await {
Ok(conn) => {
let (mut send, _) = match conn.open_bi().await {
Ok(bi) => bi,
Err(e) => {
tracing::warn!(error = %e, "reconnect handshake failed");
continue;
}
};
if let Err(e) = control::send_msg(
&mut send,
&ControlMsg::MeshHello {
identity: my_identity,
ip: my_ip,
hostname: outgoing_hostname(&network_name),
device_cert: device_cert.clone(),
},
)
.await
{
tracing::warn!(error = %e, "reconnect MeshHello failed");
continue;
}
tracing::info!(peer = %peer_id.fmt_short(), ip = %peer_ip, "reconnected to peer");
peers.add(peer_ip, peer_ipv6, conn.clone(), peer_id, &network_name);
forward::spawn_peer_reader(
conn,
peer_id,
peer_ip,
peer_ipv6,
network_name,
forward::ForwardCtx {
firewall,
tun_tx,
disconnect_tx,
token,
stats,
device_user_map,
},
);
return;
}
Err(e) => {
tracing::debug!(error = %e, "reconnect attempt failed");
}
}
}
});
}
};
tokio::spawn(reconnect_loop.instrument(span))
}
async fn open_and_send(conn: &Connection, msg: &ControlMsg) -> Result<()> {
let (mut send, _recv) = conn.open_bi().await.context("open control stream")?;
control::send_msg(&mut send, msg).await
}
async fn send_member_sync(conn: &Connection) {
let _ = open_and_send(conn, &ControlMsg::MemberSync).await;
}
async fn respond_pong(conn: &Connection, nonce: u64) {
let _ = open_and_send(conn, &ControlMsg::Pong { nonce }).await;
}
async fn broadcast_member_sync(peers: &PeerTable, exclude_ip: Option<Ipv4Addr>) {
for (ip, conn) in peers.all_connections() {
if Some(ip) == exclude_ip {
continue;
}
if let Err(e) = open_and_send(&conn, &ControlMsg::MemberSync).await {
tracing::warn!(peer_ip = %ip, error = %e, "failed to sync members");
}
}
}
async fn broadcast_control_msg(peers: &PeerTable, msg: &ControlMsg) {
for (_ip, conn) in peers.all_connections() {
let _ = open_and_send(&conn, msg).await;
}
}
#[cfg(test)]
mod report_tests {
use super::{collect_recent_logs, write_bundle};
#[test]
fn test_write_bundle_is_valid_targz() {
let dir = std::env::temp_dir().join(format!("rayfish-test-{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("bundle.tgz");
let files = vec![
("sysinfo.txt".to_string(), b"rayfish 0.1.0\n".to_vec()),
(
"logs/rayfish.log.2026-06-23".to_string(),
b"hello log\n".to_vec(),
),
];
write_bundle(&path, &files).unwrap();
let f = std::fs::File::open(&path).unwrap();
let dec = flate2::read::GzDecoder::new(f);
let mut archive = tar::Archive::new(dec);
let mut names: Vec<String> = archive
.entries()
.unwrap()
.map(|e| e.unwrap().path().unwrap().to_string_lossy().into_owned())
.collect();
names.sort();
assert_eq!(names, vec!["logs/rayfish.log.2026-06-23", "sysinfo.txt"]);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn test_collect_recent_logs_missing_dir_is_empty() {
let _ = collect_recent_logs();
}
}
#[cfg(test)]
mod accept_handler_tests {
use super::*;
use std::collections::BTreeMap;
use std::sync::Arc;
fn make_network_state() -> SharedNetworkState {
let net_secret = iroh::SecretKey::from_bytes(&[1u8; 32]);
let net_pub = net_secret.public();
Arc::new(std::sync::RwLock::new(NetworkState {
members: MemberList::new(),
approved: ApprovedList::new(),
snapshot: None,
network_secret_key: None,
network_public_key: net_pub,
network_name: Some("test-net".to_string()),
mode: GroupMode::Restricted,
suggested_firewall: SuggestedFirewall::default(),
reusable_keys: BTreeMap::new(),
pending_suggestions: Vec::new(),
pending: HashMap::new(),
}))
}
fn sample_mesh_ctx(identity: IrohIdentityProvider, blob_store: FsStore) -> MeshCtx {
let (tun_tx, _) = tokio::sync::mpsc::channel(1);
MeshCtx {
identity,
peers: PeerTable::new(),
tun_tx,
stats: Arc::new(ForwardMetrics::default()),
blob_store,
firewall: SharedFirewall::new(crate::firewall::FirewallConfig::default()),
hostname_table: dns::new_hostname_table(),
reverse_table: dns::new_reverse_table(),
device_user_map: peers::DeviceUserMap::new(),
}
}
async fn sample_coordinator_handler() -> AcceptHandler {
let tmp = tempfile::tempdir().unwrap();
let blob_store = FsStore::load(tmp.path()).await.unwrap();
let (disconnect_tx, _) = tokio::sync::mpsc::channel(1);
let my_key = iroh::SecretKey::from_bytes(&[2u8; 32]);
let my_id = my_key.public();
AcceptHandler::Coordinator(Arc::new(CoordinatorAcceptState {
ctx: sample_mesh_ctx(IrohIdentityProvider::new(my_id, 0), blob_store),
network_name: "test-net".to_string(),
state: make_network_state(),
disconnect_tx,
token: tokio_util::sync::CancellationToken::new(),
dht_notify: None,
invite_lock: Arc::new(tokio::sync::Mutex::new(())),
pending_pongs: Arc::new(DashMap::new()),
}))
}
async fn sample_member_handler() -> AcceptHandler {
let tmp = tempfile::tempdir().unwrap();
let blob_store = FsStore::load(tmp.path()).await.unwrap();
let (disconnect_tx, _) = tokio::sync::mpsc::channel(1);
let my_key = iroh::SecretKey::from_bytes(&[3u8; 32]);
AcceptHandler::Member(Arc::new(MemberAcceptState {
ctx: sample_mesh_ctx(IrohIdentityProvider::new(my_key.public(), 0), blob_store),
network_name: "test-net".to_string(),
state: make_network_state(),
disconnect_tx,
token: tokio_util::sync::CancellationToken::new(),
}))
}
#[tokio::test]
async fn register_replaces_member_handler_with_coordinator() {
assert!(!sample_member_handler().await.is_coordinator());
assert!(sample_coordinator_handler().await.is_coordinator());
}
#[test]
fn holds_key_implies_coordinator_role() {
assert_eq!(role_for_key_holder(true), NetworkRole::Coordinator);
assert_eq!(role_for_key_holder(false), NetworkRole::Member);
}
#[test]
fn choose_path_prefers_selected() {
use ipc::ConnType::*;
let classes = [(Relay, false), (Direct, true)];
assert_eq!(super::choose_path_index(&classes), Some(1));
}
#[test]
fn choose_path_falls_back_to_best_unselected() {
use ipc::ConnType::*;
let classes = [(Relay, false), (Direct, false), (Tor, false)];
assert_eq!(super::choose_path_index(&classes), Some(1));
let only_relay = [(Relay, false)];
assert_eq!(super::choose_path_index(&only_relay), Some(0));
}
#[test]
fn choose_path_empty_is_none() {
assert_eq!(super::choose_path_index(&[]), None);
}
#[test]
fn rename_satisfied_exact_and_collision_forms() {
assert!(super::rename_satisfied("scw-iroh", Some("scw-iroh")));
assert!(super::rename_satisfied("alice", Some("alice-1")));
assert!(super::rename_satisfied("alice", Some("alice-42")));
assert!(!super::rename_satisfied("scw-iroh", Some("bell")));
assert!(!super::rename_satisfied("alice", Some("alice-bob")));
assert!(!super::rename_satisfied("alice", Some("alicex")));
assert!(!super::rename_satisfied("alice", Some("alice-")));
assert!(!super::rename_satisfied("alice", None));
}
#[test]
fn promote_is_idempotent_decision() {
assert!(should_promote(NetworkRole::Member));
assert!(!should_promote(NetworkRole::Coordinator));
}
}
#[cfg(test)]
mod coordinator_dial_order_tests {
use super::*;
use crate::membership::{Member, derive_ip};
fn test_id(seed: u8) -> EndpointId {
let mut key_bytes = [0u8; 32];
key_bytes[0] = seed;
let key = iroh::SecretKey::from(key_bytes);
key.public()
}
#[test]
fn dial_order_puts_minter_first_then_other_coordinators() {
let (a, b, c, me) = (test_id(1), test_id(2), test_id(3), test_id(9));
let mk = |id, coord| Member {
identity: id,
ip: derive_ip(&id),
is_coordinator: coord,
hostname: None,
user_identity: None,
device_cert: None,
collision_index: 0,
};
let members = vec![mk(a, true), mk(b, true), mk(c, false), mk(me, true)];
assert_eq!(super::coordinator_dial_order(b, &members, me), vec![b, a]);
}
#[test]
fn admin_grant_key_accepted_only_when_public_matches_network() {
let net_secret = iroh::SecretKey::from({
let mut b = [0u8; 32];
b[0] = 42;
b
});
let net_pubkey = net_secret.public();
assert!(super::admin_grant_key_valid(
net_secret.to_bytes(),
net_pubkey
));
let forged = iroh::SecretKey::from({
let mut b = [0u8; 32];
b[0] = 7;
b
});
assert!(!super::admin_grant_key_valid(forged.to_bytes(), net_pubkey));
}
#[test]
fn gossip_targets_are_coordinator_peers_only() {
let (a, b, c) = (test_id(1), test_id(2), test_id(3));
let mk = |id, coord| Member {
identity: id,
ip: derive_ip(&id),
is_coordinator: coord,
hostname: None,
user_identity: None,
device_cert: None,
collision_index: 0,
};
let members = vec![mk(a, true), mk(b, false), mk(c, true)];
let me = a;
assert_eq!(super::gossip_targets(&members, me), vec![c]);
}
}
#[cfg(test)]
mod dial_fallback_tests {
use super::*;
#[test]
fn dial_fallback_stops_on_first_welcome() {
let outcomes = vec![
DialOutcome::Unreachable,
DialOutcome::Welcomed,
DialOutcome::Denied,
];
let (idx, welcomed) = pick_first_welcome(&outcomes);
assert_eq!((idx, welcomed), (1, true));
}
#[test]
fn dial_fallback_reports_failure_when_all_exhausted() {
let outcomes = vec![DialOutcome::Unreachable, DialOutcome::Denied];
let (_idx, welcomed) = pick_first_welcome(&outcomes);
assert!(!welcomed);
}
}