use bytes::Bytes;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::net::{Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use dashmap::{DashMap, DashSet};
use anyhow::{Context, Result};
use iroh::address_lookup::PkarrRelayClient;
use iroh::endpoint::{Connection, Endpoint, VarInt};
use iroh::protocol::{AcceptError, ProtocolHandler};
use iroh::{EndpointId, SecretKey};
use iroh_blobs::store::fs::FsStore;
use iroh_blobs::{BlobsProtocol, HashAndFormat};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::audit;
use crate::config;
use crate::control::{self, ControlMsg};
use crate::dht;
use crate::dns;
use crate::dns_config;
use crate::firewall::{self, SharedFirewall};
use crate::forward;
use crate::identity;
use crate::ipc::{self, FirewallRuleView, IpcMessage, NetworkRole, NetworkStatus, PeerStatus};
use crate::membership::{
ApprovedEntry, ApprovedList, GroupMode, IdentityProvider, IrohIdentityProvider, Member,
MemberList, canonical_group_bytes, derive_ipv6, group_blob_hash, verify_group_blob,
};
use crate::network_name;
use crate::peers::{self, PeerTable};
use crate::stats::ForwardMetrics;
use crate::transport;
use crate::tun::{self, check_cgnat_conflict};
use ray_proto::SuggestedFirewall;
const BACKOFF_INITIAL: Duration = Duration::from_secs(1);
const BACKOFF_MAX: Duration = Duration::from_secs(30);
const PAIR_ALPN: &[u8] = b"rayfish/pair/1";
struct CoordinatorAcceptState {
network_name: String,
identity: IrohIdentityProvider,
state: Arc<std::sync::RwLock<NetworkState>>,
peers: PeerTable,
tun_tx: mpsc::Sender<Bytes>,
disconnect_tx: mpsc::Sender<forward::DisconnectEvent>,
token: CancellationToken,
stats: Arc<ForwardMetrics>,
dht_notify: Option<Arc<tokio::sync::Notify>>,
blob_store: FsStore,
firewall: SharedFirewall,
hostname_table: dns::HostnameTable,
reverse_table: dns::ReverseLookupTable,
device_user_map: peers::DeviceUserMap,
invite_lock: Arc<tokio::sync::Mutex<()>>,
pending_pongs: Arc<DashMap<u64, tokio::sync::oneshot::Sender<()>>>,
}
impl CoordinatorAcceptState {
async fn handle_connection(&self, conn: Connection) {
let remote_id = conn.remote_id();
let member_ip = {
let s = self.state.read().unwrap();
s.members.get(&remote_id).map(|m| m.ip)
};
let peer_ip = member_ip.unwrap_or_else(|| self.identity.derive_ip(&remote_id));
let is_member = member_ip.is_some();
if is_member {
tracing::info!(ip = %peer_ip, "known member reconnecting");
crate::spawn_path_logger(conn.clone(), remote_id.fmt_short().to_string());
let peer_ipv6 = derive_ipv6(&remote_id);
self.peers.add(
peer_ip,
peer_ipv6,
conn.clone(),
remote_id,
&self.network_name,
);
let token = self.token.clone();
let stats = self.stats.clone();
let tun_tx = self.tun_tx.clone();
let disconnect_tx = self.disconnect_tx.clone();
let network = self.network_name.clone();
let firewall = self.firewall.clone();
let state = self.state.clone();
let hostname_table = self.hostname_table.clone();
let reverse_table = self.reverse_table.clone();
let device_user_map = self.device_user_map.clone();
let peers_ctrl = self.peers.clone();
let blob_store_ctrl = self.blob_store.clone();
let dht_notify_ctrl = self.dht_notify.clone();
let token_ctrl = token.clone();
let network_ctrl = network.clone();
let invite_lock_ctrl = self.invite_lock.clone();
let pending_pongs_ctrl = self.pending_pongs.clone();
tokio::spawn(async move {
send_member_sync(&conn).await;
spawn_coordinator_control_reader(
conn.clone(),
remote_id,
peer_ip,
network_ctrl,
state,
hostname_table,
reverse_table,
device_user_map.clone(),
peers_ctrl,
blob_store_ctrl,
dht_notify_ctrl,
token_ctrl,
invite_lock_ctrl,
pending_pongs_ctrl,
);
forward::spawn_peer_reader(
conn,
remote_id,
peer_ip,
peer_ipv6,
network,
firewall,
tun_tx,
disconnect_tx,
token,
stats,
device_user_map,
);
});
return;
}
let (send, mut recv) =
match tokio::time::timeout(Duration::from_secs(5), conn.accept_bi()).await {
Ok(Ok(pair)) => pair,
_ => return,
};
let msg = match tokio::time::timeout(Duration::from_secs(5), control::recv_msg(&mut recv))
.await
{
Ok(Ok(m)) => m,
_ => return,
};
let (invite_secret, hostname, device_cert) = match msg {
ControlMsg::JoinRequest {
invite_secret,
hostname,
device_cert,
} => (invite_secret, hostname, device_cert),
ControlMsg::MeshHello {
hostname,
device_cert,
..
} => (None, hostname, device_cert),
_ => return,
};
if let Some(ref cert) = device_cert {
if !cert.verify() || cert.device_key != remote_id {
tracing::warn!(peer = %remote_id.fmt_short(), "invalid device certificate");
return;
}
self.device_user_map.insert(remote_id, cert.user_identity);
}
let is_approved = self.state.read().unwrap().approved.is_approved(&remote_id);
if is_approved {
self.admit_peer(
conn,
send,
remote_id,
peer_ip,
hostname,
device_cert,
true,
false,
)
.await;
return;
}
if let Some(secret) = invite_secret {
let redeemed = {
let _guard = self.invite_lock.lock().await;
match crate::invite::InviteStore::load(&self.network_name) {
Ok(mut store) => store.redeem(&secret, remote_id),
Err(e) => Err(e),
}
};
match redeemed {
Ok(invite_hostname) => {
tracing::info!(peer = %remote_id.fmt_short(), "invite redeemed");
let authoritative = invite_hostname.is_some();
let assigned = invite_hostname.or(hostname);
let admitted = self
.admit_peer(
conn,
send,
remote_id,
peer_ip,
assigned,
device_cert,
false,
authoritative,
)
.await;
if !admitted {
let _guard = self.invite_lock.lock().await;
if let Ok(mut store) = crate::invite::InviteStore::load(&self.network_name)
{
let _ = store.restore(&secret);
}
} else {
let secret_hash = crate::invite::hash_secret(&secret);
let members: Vec<crate::membership::Member> = self
.state
.read()
.unwrap()
.members
.all()
.into_iter()
.cloned()
.collect();
gossip_to_coordinators(
&self.peers,
&self.network_name,
&members,
self.identity.local_identity(),
&ControlMsg::InviteUsed {
secret_hash: secret_hash.into_bytes(),
},
)
.await;
}
}
Err(single_use_err) => {
let reusable_id = {
let s = self.state.read().unwrap();
crate::membership::validate_reusable_key(
&s.reusable_keys,
&secret,
now_secs(),
)
.map(|k| k.id.clone())
};
if let Some(key_id) = reusable_id {
tracing::info!(
peer = %remote_id.fmt_short(),
key_id = %key_id,
"reusable key redeemed"
);
self.admit_peer(
conn,
send,
remote_id,
peer_ip,
hostname,
device_cert,
false,
false,
)
.await;
} else {
tracing::warn!(peer = %remote_id.fmt_short(), error = %single_use_err, "invite rejected");
self.deny(&conn, send, format!("invite rejected: {single_use_err}"))
.await;
}
}
}
return;
}
let mode = self.state.read().unwrap().mode;
match mode {
GroupMode::Open => {
self.admit_peer(
conn,
send,
remote_id,
peer_ip,
hostname,
device_cert,
false,
false,
)
.await;
}
GroupMode::Restricted => {
{
let mut s = self.state.write().unwrap();
s.pending.insert(
remote_id,
PendingJoin {
hostname,
device_cert,
requested_at: Instant::now(),
},
);
}
tracing::info!(peer = %remote_id.fmt_short(), ip = %peer_ip, "join queued for approval");
let mut send = send;
let _ = control::send_msg(&mut send, &ControlMsg::JoinPending).await;
let _ = tokio::time::timeout(Duration::from_secs(5), conn.closed()).await;
}
}
}
async fn deny(&self, conn: &Connection, mut send: iroh::endpoint::SendStream, reason: String) {
let _ = control::send_msg(&mut send, &ControlMsg::JoinDenied { reason }).await;
let _ = tokio::time::timeout(Duration::from_secs(5), conn.closed()).await;
}
#[allow(clippy::too_many_arguments)]
async fn admit_peer(
&self,
conn: Connection,
mut send: iroh::endpoint::SendStream,
remote_id: EndpointId,
_suggested_ip: Ipv4Addr,
hostname: Option<String>,
device_cert: Option<control::DeviceCert>,
was_approved: bool,
authoritative: bool,
) -> bool {
let (peer_ip, collision_index) = {
let s = self.state.read().unwrap();
crate::membership::assign_ip(&s.members, &remote_id)
};
let final_hostname = if let Some(desired) = hostname {
let taken = {
let s = self.state.read().unwrap();
s.members
.all()
.iter()
.filter(|m| m.identity != remote_id)
.filter_map(|m| m.hostname.clone())
.collect::<Vec<String>>()
};
let taken_refs: Vec<&str> = taken.iter().map(|s| s.as_str()).collect();
match crate::hostname::admission_hostname(&desired, &taken_refs, authoritative) {
Ok(name) => Some(name),
Err(conflict) => {
self.deny(
&conn,
send,
format!("hostname '{conflict}' is already in use on this network"),
)
.await;
return false;
}
}
} else {
None
};
let collision = {
let s = self.state.read().unwrap();
if let Some(existing) = s.members.get_by_ip(peer_ip) {
existing.identity != remote_id
} else if let Some(existing) = s.approved.get_by_ip(peer_ip) {
existing.identity != remote_id
} else {
false
}
};
if collision {
self.deny(
&conn,
send,
format!("IP collision: {peer_ip} already assigned"),
)
.await;
return false;
}
let user_id_opt = device_cert.as_ref().map(|c| c.user_identity);
let snap_bytes = {
let mut s = self.state.write().unwrap();
if was_approved {
s.approved.remove(&remote_id);
}
s.pending.remove(&remote_id);
let _ = s.members.add(Member {
identity: remote_id,
ip: peer_ip,
is_coordinator: false,
hostname: final_hostname.clone(),
user_identity: user_id_opt,
device_cert: device_cert.clone(),
collision_index,
});
s.refresh_snapshot();
s.snapshot.as_ref().map(|snap| snap.msgpack_bytes.clone())
};
if let Some(bytes) = snap_bytes {
let _ = self.blob_store.blobs().add_slice(&bytes).await;
}
if let Some(ref h) = final_hostname {
dns::update_hostname(
&self.hostname_table,
&self.reverse_table,
&self.network_name,
h,
peer_ip,
derive_ipv6(&remote_id),
)
.await;
}
broadcast_control_msg(
&self.peers,
&ControlMsg::MemberApproved {
identity: remote_id,
ip: peer_ip,
hostname: final_hostname.clone(),
device_cert: device_cert.clone(),
},
)
.await;
let (members, approved) = {
let s = self.state.read().unwrap();
(
s.members.all().into_iter().cloned().collect::<Vec<_>>(),
s.approved.all().into_iter().cloned().collect::<Vec<_>>(),
)
};
tracing::info!(ip = %peer_ip, "new member admitted and joined");
let _ = control::send_msg(
&mut send,
&ControlMsg::Welcome {
members: members.clone(),
approved,
},
)
.await;
if let Some(notify) = &self.dht_notify {
notify.notify_one();
}
broadcast_member_sync(&self.peers, Some(peer_ip)).await;
let peer_ipv6 = derive_ipv6(&remote_id);
crate::spawn_path_logger(conn.clone(), remote_id.fmt_short().to_string());
self.peers.add(
peer_ip,
peer_ipv6,
conn.clone(),
remote_id,
&self.network_name,
);
spawn_coordinator_control_reader(
conn.clone(),
remote_id,
peer_ip,
self.network_name.clone(),
self.state.clone(),
self.hostname_table.clone(),
self.reverse_table.clone(),
self.device_user_map.clone(),
self.peers.clone(),
self.blob_store.clone(),
self.dht_notify.clone(),
self.token.clone(),
self.invite_lock.clone(),
self.pending_pongs.clone(),
);
forward::spawn_peer_reader(
conn,
remote_id,
peer_ip,
peer_ipv6,
self.network_name.clone(),
self.firewall.clone(),
self.tun_tx.clone(),
self.disconnect_tx.clone(),
self.token.clone(),
self.stats.clone(),
self.device_user_map.clone(),
);
true
}
}
struct MemberAcceptState {
network_name: String,
state: Arc<std::sync::RwLock<NetworkState>>,
peers: PeerTable,
tun_tx: mpsc::Sender<Bytes>,
disconnect_tx: mpsc::Sender<forward::DisconnectEvent>,
token: CancellationToken,
stats: Arc<ForwardMetrics>,
blob_store: FsStore,
firewall: SharedFirewall,
hostname_table: dns::HostnameTable,
reverse_table: dns::ReverseLookupTable,
device_user_map: peers::DeviceUserMap,
}
impl MemberAcceptState {
async fn handle_connection(&self, conn: Connection) {
let Ok((_send, mut recv)) = conn.accept_bi().await else {
return;
};
let transport_id = conn.remote_id();
let Ok(ControlMsg::MeshHello {
identity: peer_identity,
ip,
hostname,
device_cert,
..
}) = control::recv_msg(&mut recv).await
else {
return;
};
let effective_user_id = if peer_identity == transport_id {
peer_identity
} else if let Some(ref cert) = device_cert {
if !cert.verify()
|| cert.device_key != transport_id
|| cert.user_identity != peer_identity
{
tracing::warn!(peer = %transport_id.fmt_short(), "invalid device certificate");
return;
}
cert.user_identity
} else {
return;
};
if let Some(ref cert) = device_cert {
self.device_user_map
.insert(transport_id, cert.user_identity);
}
let _ = effective_user_id;
let (is_member, is_approved) = {
let s = self.state.read().unwrap();
(
s.members.is_member(&peer_identity),
s.approved.is_approved(&peer_identity),
)
};
let final_hostname = if let Some(desired) = hostname {
let taken: Vec<String> = {
let s = self.state.read().unwrap();
s.members
.all()
.iter()
.filter(|m| m.identity != peer_identity)
.filter_map(|m| m.hostname.clone())
.collect()
};
let taken_refs: Vec<&str> = taken.iter().map(|s| s.as_str()).collect();
Some(crate::hostname::resolve_collision(&desired, &taken_refs))
} else {
None
};
if let Some(ref h) = final_hostname {
let ipv6 = derive_ipv6(&peer_identity);
dns::update_hostname(
&self.hostname_table,
&self.reverse_table,
&self.network_name,
h,
ip,
ipv6,
)
.await;
}
if is_approved {
let (snap_bytes, ip) = {
let mut s = self.state.write().unwrap();
let approved_entry = s.approved.remove(&peer_identity);
let user_id_opt = device_cert.as_ref().map(|c| c.user_identity);
let (member_ip, member_idx) = approved_entry
.as_ref()
.map(|e| (e.ip, e.collision_index))
.unwrap_or((ip, 0));
let _ = s.members.add(Member {
identity: peer_identity,
ip: member_ip,
is_coordinator: false,
hostname: final_hostname.clone(),
user_identity: user_id_opt,
device_cert: device_cert.clone(),
collision_index: member_idx,
});
s.refresh_snapshot();
(
s.snapshot.as_ref().map(|snap| snap.msgpack_bytes.clone()),
member_ip,
)
};
if let Some(bytes) = snap_bytes {
let _ = self.blob_store.blobs().add_slice(&bytes).await;
}
let (members, approved_list) = {
let s = self.state.read().unwrap();
(
s.members.all().into_iter().cloned().collect::<Vec<_>>(),
s.approved.all().into_iter().cloned().collect::<Vec<_>>(),
)
};
if let Ok((mut send, _)) = conn.open_bi().await {
let _ = control::send_msg(
&mut send,
&ControlMsg::Welcome {
members: members.clone(),
approved: approved_list,
},
)
.await;
}
let peer_ipv6 = derive_ipv6(&peer_identity);
self.peers.add(
ip,
peer_ipv6,
conn.clone(),
peer_identity,
&self.network_name,
);
forward::spawn_peer_reader(
conn,
peer_identity,
ip,
peer_ipv6,
self.network_name.clone(),
self.firewall.clone(),
self.tun_tx.clone(),
self.disconnect_tx.clone(),
self.token.clone(),
self.stats.clone(),
self.device_user_map.clone(),
);
broadcast_member_sync(&self.peers, Some(ip)).await;
} else if is_member {
if final_hostname.is_some() {
let mut s = self.state.write().unwrap();
if let Some(m) = s.members.get_mut(&peer_identity) {
m.hostname = final_hostname;
}
}
let peer_ipv6 = derive_ipv6(&peer_identity);
self.peers.add(
ip,
peer_ipv6,
conn.clone(),
peer_identity,
&self.network_name,
);
forward::spawn_peer_reader(
conn,
peer_identity,
ip,
peer_ipv6,
self.network_name.clone(),
self.firewall.clone(),
self.tun_tx.clone(),
self.disconnect_tx.clone(),
self.token.clone(),
self.stats.clone(),
self.device_user_map.clone(),
);
}
}
}
enum AcceptHandler {
Coordinator(Arc<CoordinatorAcceptState>),
Member(Arc<MemberAcceptState>),
}
#[cfg(test)]
impl AcceptHandler {
fn is_coordinator(&self) -> bool {
matches!(self, AcceptHandler::Coordinator(_))
}
}
struct MeshProtocol {
handler: AcceptHandler,
}
impl std::fmt::Debug for MeshProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MeshProtocol").finish()
}
}
impl ProtocolHandler for MeshProtocol {
async fn accept(&self, conn: Connection) -> Result<(), AcceptError> {
match &self.handler {
AcceptHandler::Coordinator(state) => state.handle_connection(conn).await,
AcceptHandler::Member(state) => state.handle_connection(conn).await,
}
Ok(())
}
}
struct PendingFile {
id: u64,
from: EndpointId,
filename: String,
size: u64,
mime_type: String,
blob_hash: blake3::Hash,
}
#[derive(Clone)]
struct PendingConnect {
from_contact_id: EndpointId,
from_endpoint: EndpointId,
hostname: Option<String>,
requested_at: Instant,
}
struct ProtocolRouter {
blobs: BlobsProtocol,
handlers: DashMap<Vec<u8>, Arc<MeshProtocol>>,
pending_files: Arc<std::sync::Mutex<Vec<PendingFile>>>,
file_id_counter: Arc<AtomicU64>,
pairing_secret: Arc<std::sync::Mutex<Option<[u8; 32]>>>,
secret_key: SecretKey,
pending_connects: Arc<DashMap<EndpointId, PendingConnect>>,
approved_connects: Arc<DashMap<EndpointId, (EndpointId, EndpointId)>>,
outgoing_connects: Arc<DashSet<EndpointId>>,
pending_pongs: Arc<DashMap<u64, tokio::sync::oneshot::Sender<()>>>,
}
impl ProtocolRouter {
fn new(
blobs: BlobsProtocol,
secret_key: SecretKey,
pairing_secret: Arc<std::sync::Mutex<Option<[u8; 32]>>>,
) -> Self {
Self {
blobs,
handlers: DashMap::new(),
pending_files: Arc::new(std::sync::Mutex::new(Vec::new())),
file_id_counter: Arc::new(AtomicU64::new(1)),
pairing_secret,
secret_key,
pending_connects: Arc::new(DashMap::new()),
approved_connects: Arc::new(DashMap::new()),
outgoing_connects: Arc::new(DashSet::new()),
pending_pongs: Arc::new(DashMap::new()),
}
}
fn register(&self, alpn: Vec<u8>, handler: AcceptHandler) {
self.handlers
.insert(alpn, Arc::new(MeshProtocol { handler }));
}
fn unregister(&self, alpn: &[u8]) {
self.handlers.remove(alpn);
}
fn alpns(&self) -> Vec<Vec<u8>> {
let mut alpns: Vec<Vec<u8>> = self.handlers.iter().map(|r| r.key().clone()).collect();
alpns.push(iroh_blobs::protocol::ALPN.to_vec());
alpns.push(transport::FILES_ALPN.to_vec());
alpns.push(PAIR_ALPN.to_vec());
alpns.push(transport::CONNECT_ALPN.to_vec());
alpns
}
fn spawn_accept_loop(
self: &Arc<Self>,
endpoint: Endpoint,
cancel: CancellationToken,
) -> tokio::task::JoinHandle<()> {
let router = self.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = cancel.cancelled() => return,
incoming = endpoint.accept() => {
let Some(incoming) = incoming else { return };
let router = router.clone();
tokio::spawn(async move {
let conn = match incoming.await {
Ok(c) => c,
Err(e) => {
tracing::debug!(error = ?e, "incoming handshake failed");
return;
}
};
let alpn = conn.alpn().to_vec();
match alpn.as_slice() {
a if a == iroh_blobs::protocol::ALPN => {
let _ = router.blobs.clone().accept(conn).await;
}
a if a == transport::FILES_ALPN => {
let pending = router.pending_files.clone();
let counter = router.file_id_counter.clone();
let remote_id = conn.remote_id();
match conn.accept_bi().await {
Ok((_send, mut recv)) => {
match control::recv_msg(&mut recv).await {
Ok(control::ControlMsg::FileOffer { from, filename, size, mime_type, blob_hash }) => {
if from == remote_id {
let id = counter.fetch_add(1, Ordering::Relaxed);
tracing::info!(from = %from.fmt_short(), filename = %filename, size, "file offer received");
pending.lock().unwrap().push(PendingFile { id, from, filename, size, mime_type, blob_hash });
} else {
tracing::warn!(claimed = %from.fmt_short(), actual = %remote_id.fmt_short(), "file offer identity mismatch");
}
}
Ok(other) => {
tracing::warn!(msg = ?other, "unexpected control message on FILES_ALPN");
}
Err(e) => {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to read file offer");
}
}
}
Err(e) => {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to accept bi stream for file offer");
}
}
}
a if a == PAIR_ALPN => {
let pairing_secret = router.pairing_secret.clone();
let secret_key = router.secret_key.clone();
let remote_id = conn.remote_id();
match conn.accept_bi().await {
Ok((mut send, mut recv)) => {
let mut len_buf = [0u8; 4];
if let Err(e) = recv.read_exact(&mut len_buf).await {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to read pair request length");
return;
}
let body_len = u32::from_be_bytes(len_buf) as usize;
let mut body = vec![0u8; body_len];
if let Err(e) = recv.read_exact(&mut body).await {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to read pair request body");
return;
}
let request: control::PairMsg = match rmp_serde::from_slice(&body) {
Ok(r) => r,
Err(e) => {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to decode pair request");
return;
}
};
match request {
control::PairMsg::Request { secret, device_pubkey } => {
let stored = pairing_secret.lock().unwrap().take();
match stored {
Some(expected) if expected == secret => {
let cert = control::DeviceCert::create(&secret_key, &device_pubkey);
let response = control::PairMsg::Response { cert };
let response_bytes = match rmp_serde::to_vec_named(&response) {
Ok(b) => b,
Err(e) => {
tracing::warn!(error = %e, "failed to encode pair response");
return;
}
};
let len = (response_bytes.len() as u32).to_be_bytes();
if let Err(e) = send.write_all(&len).await {
tracing::warn!(error = %e, "failed to send pair response length");
return;
}
if let Err(e) = send.write_all(&response_bytes).await {
tracing::warn!(error = %e, "failed to send pair response body");
return;
}
let _ = send.finish();
let _ = tokio::time::timeout(
Duration::from_secs(5),
conn.closed(),
)
.await;
tracing::info!(device = %device_pubkey.fmt_short(), "device paired successfully");
}
Some(_) => {
tracing::warn!(peer = %remote_id.fmt_short(), "pairing secret mismatch");
}
None => {
tracing::warn!(peer = %remote_id.fmt_short(), "no pairing session active");
}
}
}
_ => {
tracing::warn!(peer = %remote_id.fmt_short(), "unexpected pair message type");
}
}
}
Err(e) => {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to accept bi stream for pairing");
}
}
}
a if a == transport::CONNECT_ALPN => {
let pending = router.pending_connects.clone();
let approved = router.approved_connects.clone();
let remote_id = conn.remote_id();
match conn.accept_bi().await {
Ok((mut send, mut recv)) => {
let request: control::ConnectMsg = match control::recv_framed(&mut recv).await {
Ok(r) => r,
Err(e) => {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to read connect request");
return;
}
};
if let control::ConnectMsg::Request { from_contact_id, from_endpoint, hostname } = request {
if from_endpoint != remote_id {
tracing::warn!(claimed = %from_endpoint.fmt_short(), actual = %remote_id.fmt_short(), "connect request endpoint mismatch");
let _ = control::send_framed(&mut send, &control::ConnectMsg::Denied { reason: "endpoint mismatch".to_string() }).await;
return;
}
let already = approved.get(&from_endpoint).map(|r| *r.value());
let reply = if let Some((room_id, coordinator)) = already {
control::ConnectMsg::Approved { room_id, coordinator }
} else {
pending.insert(from_endpoint, PendingConnect {
from_contact_id,
from_endpoint,
hostname,
requested_at: Instant::now(),
});
tracing::info!(from = %from_contact_id.fmt_short(), endpoint = %from_endpoint.fmt_short(), "connect request received");
control::ConnectMsg::Pending
};
if let Err(e) = control::send_framed(&mut send, &reply).await {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to send connect reply");
return;
}
let _ = tokio::time::timeout(Duration::from_secs(5), conn.closed()).await;
} else {
tracing::warn!(peer = %remote_id.fmt_short(), "unexpected connect message type");
}
}
Err(e) => {
tracing::warn!(error = %e, peer = %remote_id.fmt_short(), "failed to accept bi stream for connect");
}
}
}
_ => {
if let Some(handler) = router.handlers.get(&alpn).map(|r| r.clone()) {
let _ = handler.accept(conn).await;
} else {
tracing::warn!(
alpn = %String::from_utf8_lossy(&alpn),
"no handler for ALPN"
);
}
}
}
});
}
}
}
})
}
}
#[derive(Clone)]
struct GroupSnapshot {
hash: blake3::Hash,
msgpack_bytes: Vec<u8>,
}
struct NetworkState {
members: MemberList,
approved: ApprovedList,
snapshot: Option<GroupSnapshot>,
network_secret_key: Option<SecretKey>,
network_public_key: EndpointId,
network_name: Option<String>,
mode: GroupMode,
suggested_firewall: SuggestedFirewall,
reusable_keys: BTreeMap<String, crate::membership::ReusableKey>,
pending_suggestions: Vec<firewall::FirewallRule>,
pending: HashMap<EndpointId, PendingJoin>,
}
struct PendingJoin {
hostname: Option<String>,
device_cert: Option<control::DeviceCert>,
requested_at: Instant,
}
impl NetworkState {
fn refresh_snapshot(&mut self) {
let bytes = canonical_group_bytes(
&self.members,
&self.approved,
&self.suggested_firewall,
self.network_name.as_deref(),
&self.reusable_keys,
);
let hash = blake3::hash(&bytes);
self.snapshot = Some(GroupSnapshot {
hash,
msgpack_bytes: bytes,
});
}
}
#[allow(dead_code)]
pub struct NetworkHandle {
name: String,
network_key: EndpointId,
role: NetworkRole,
my_ip: Ipv4Addr,
state: Arc<std::sync::RwLock<NetworkState>>,
dht_notify: Option<Arc<tokio::sync::Notify>>,
cancel: CancellationToken,
tasks: Vec<JoinHandle<()>>,
invite_lock: Arc<tokio::sync::Mutex<()>>,
disconnect_tx: mpsc::Sender<forward::DisconnectEvent>,
}
pub struct DaemonState {
endpoint: Endpoint,
identity: IrohIdentityProvider,
peers: PeerTable,
stats: Arc<ForwardMetrics>,
start: Instant,
tun_tx: mpsc::Sender<Bytes>,
networks: Arc<DashMap<String, NetworkHandle>>,
shutdown_token: CancellationToken,
blob_store: FsStore,
firewall: SharedFirewall,
protocol_router: Arc<ProtocolRouter>,
hostname_table: dns::HostnameTable,
reverse_table: dns::ReverseLookupTable,
mdns_enabled: bool,
tun_name: String,
pairing_secret: Arc<std::sync::Mutex<Option<[u8; 32]>>>,
device_cert: Option<control::DeviceCert>,
device_user_map: peers::DeviceUserMap,
contact_public: EndpointId,
active: Arc<AtomicBool>,
dns_configurator: Arc<std::sync::Mutex<Option<Box<dyn dns_config::DnsConfigurator>>>>,
resolver: std::sync::Arc<crate::dns_resolver::Resolver>,
dns_reassert_token: std::sync::Mutex<Option<tokio_util::sync::CancellationToken>>,
promote_tx: mpsc::Sender<String>,
}
fn role_for_key_holder(holds_network_key: bool) -> NetworkRole {
if holds_network_key {
NetworkRole::Coordinator
} else {
NetworkRole::Member
}
}
fn admin_grant_key_valid(secret_key: [u8; 32], net_pubkey: EndpointId) -> bool {
SecretKey::from(secret_key).public() == net_pubkey
}
fn should_promote(current: NetworkRole) -> bool {
!current.is_coordinator()
}
impl DaemonState {
async fn refresh_alpns(&self) {
let alpns = self.protocol_router.alpns();
let alpn_strs: Vec<String> = alpns
.iter()
.map(|a| String::from_utf8_lossy(a).to_string())
.collect();
tracing::info!(alpns = ?alpn_strs, "refreshing ALPNs");
self.endpoint.set_alpns(alpns);
let network_names: Vec<String> = self.networks.iter().map(|e| e.key().clone()).collect();
dns_config::update_search_domains(&network_names, &self.tun_name).await;
}
#[allow(clippy::too_many_arguments)]
fn register_coordinator_handler(
&self,
network: &str,
state: Arc<std::sync::RwLock<NetworkState>>,
invite_lock: Arc<tokio::sync::Mutex<()>>,
dht_notify: Option<Arc<tokio::sync::Notify>>,
network_key: EndpointId,
disconnect_tx: mpsc::Sender<forward::DisconnectEvent>,
cancel: CancellationToken,
) {
self.protocol_router.register(
transport::network_alpn(&network_key),
AcceptHandler::Coordinator(Arc::new(CoordinatorAcceptState {
network_name: network.to_string(),
identity: self.identity.clone(),
state,
peers: self.peers.clone(),
tun_tx: self.tun_tx.clone(),
disconnect_tx,
token: cancel,
stats: self.stats.clone(),
dht_notify,
blob_store: self.blob_store.clone(),
firewall: self.firewall.clone(),
hostname_table: self.hostname_table.clone(),
reverse_table: self.reverse_table.clone(),
device_user_map: self.device_user_map.clone(),
invite_lock,
pending_pongs: self.protocol_router.pending_pongs.clone(),
})),
);
if let Some(mut handle) = self.networks.get_mut(network) {
handle.role = NetworkRole::Coordinator;
}
}
async fn promote_to_coordinator(&self, network: &str) {
let parts = {
let Some(h) = self.networks.get(network) else {
return;
};
if !should_promote(h.role.clone()) {
return;
}
(
h.state.clone(),
h.invite_lock.clone(),
h.dht_notify.clone(),
h.network_key,
h.disconnect_tx.clone(),
h.cancel.clone(),
)
}; self.register_coordinator_handler(
network, parts.0, parts.1, parts.2, parts.3, parts.4, parts.5,
);
self.refresh_alpns().await;
tracing::info!(network, "promoted to coordinator accept handler");
}
fn check_authorized(req: &IpcMessage, peer_cred: Option<(u32, u32)>) -> Option<IpcMessage> {
if matches!(
req,
IpcMessage::Status
| IpcMessage::Report
| IpcMessage::FirewallShow
| IpcMessage::FirewallSuggestions { .. }
| IpcMessage::FirewallPending { .. }
| IpcMessage::ListFiles
| IpcMessage::Connections
| IpcMessage::ContactId
| IpcMessage::Ping { .. }
| IpcMessage::Netcheck
) {
return None;
}
let uid = peer_cred.map(|(uid, _)| uid);
if uid == Some(0) {
return None;
}
if matches!(req, IpcMessage::SetOperator { .. }) {
return Some(IpcMessage::Error {
message: "permission denied: granting operator access requires root \
(re-run with sudo)"
.to_string(),
});
}
let operator = config::load().ok().and_then(|c| c.operator_uid);
if uid.is_some() && uid == operator {
return None;
}
Some(IpcMessage::Error {
message: "permission denied: this user is not authorized to control rayfish.\n\
Grant access with: sudo ray set-operator <user>"
.to_string(),
})
}
fn set_operator(&self, uid: u32) -> IpcMessage {
let mut app_config = match config::load() {
Ok(c) => c,
Err(e) => {
return IpcMessage::Error {
message: format!("failed to load config: {e}"),
};
}
};
app_config.operator_uid = Some(uid);
if let Err(e) = config::save_settings(&app_config) {
return IpcMessage::Error {
message: format!("failed to save config: {e}"),
};
}
IpcMessage::Ok {
message: format!("operator set to uid {uid}; that user can now run ray without sudo"),
}
}
async fn handle_request(
self: &Arc<Self>,
req: IpcMessage,
peer_cred: Option<(u32, u32)>,
) -> IpcMessage {
if let Some(denied) = Self::check_authorized(&req, peer_cred) {
return denied;
}
match req {
IpcMessage::Create {
mode,
name,
hostname,
transport: _,
} => self.create_network(mode, name, hostname).await,
IpcMessage::Join {
network_key,
name,
hostname,
transport: _,
invite,
coordinator,
auto_accept_firewall,
} => {
self.join_network(
&network_key,
name.as_deref(),
hostname,
invite,
coordinator,
auto_accept_firewall,
)
.await
}
IpcMessage::Leave { name } => self.leave_network(&name).await,
IpcMessage::Nuke { name, force } => self.nuke_network(&name, force).await,
IpcMessage::Status => self.status(),
IpcMessage::Report => self.build_report(peer_cred),
IpcMessage::Up { hostname } => self.activate(hostname).await,
IpcMessage::Down => self.deactivate().await,
IpcMessage::Shutdown => {
self.shutdown_token.cancel();
IpcMessage::Ok {
message: "shutting down".to_string(),
}
}
IpcMessage::FirewallAdd {
direction,
action,
protocol,
port,
peer,
network,
} => self.firewall_add(
direction,
action,
protocol,
port.as_deref(),
peer.as_deref(),
network.as_deref(),
),
IpcMessage::FirewallRemove { index } => self.firewall_remove(index),
IpcMessage::FirewallShow => self.firewall_show(),
IpcMessage::FirewallDefault { action } => self.firewall_default(action),
IpcMessage::FirewallReject { enabled } => self.firewall_reject(enabled),
IpcMessage::FirewallSuggest {
network,
suggestions,
} => self.firewall_suggest(&network, suggestions).await,
IpcMessage::FirewallSuggestions { network } => self.firewall_suggestions(&network),
IpcMessage::FirewallPending { network } => self.firewall_pending(&network),
IpcMessage::FirewallAccept { network } => self.firewall_accept(&network),
IpcMessage::FirewallDeny { network } => self.firewall_deny(&network),
IpcMessage::FirewallResolveSuggestions {
network,
accept,
deny,
} => self.firewall_resolve_suggestions(&network, &accept, &deny),
IpcMessage::FirewallAutoAccept { network, enabled } => {
self.firewall_auto_accept(&network, enabled)
}
IpcMessage::SetHostname { network, hostname } => {
self.set_hostname(&network, &hostname).await
}
IpcMessage::SendFile { path, peer } => self.send_file(&path, &peer).await,
IpcMessage::ListFiles => self.list_files(),
IpcMessage::AcceptFile { id, output } => self.accept_file(id, output, peer_cred).await,
IpcMessage::StartPairing => self.start_pairing(),
IpcMessage::PairWithDevice {
endpoint_id,
secret,
} => self.pair_with_device(endpoint_id, secret).await,
IpcMessage::SetOperator { uid } => self.set_operator(uid),
IpcMessage::InviteCreate {
network,
expires_secs,
hostname,
reusable,
} => {
self.invite_create(&network, expires_secs, hostname, reusable)
.await
}
IpcMessage::InviteList { network } => self.invite_list(&network).await,
IpcMessage::InviteRevoke { network, id } => self.invite_revoke(&network, &id).await,
IpcMessage::Requests { network } => self.list_requests(&network),
IpcMessage::AcceptRequest { network, id } => self.accept_request(&network, &id).await,
IpcMessage::DenyRequest { network, id } => self.deny_request(&network, &id),
IpcMessage::AdminAdd { network, identity } => self.admin_add(&network, &identity).await,
IpcMessage::AdminList { network } => self.admin_list(&network),
IpcMessage::Connect {
contact_id,
hostname,
} => self.connect(&contact_id, hostname).await,
IpcMessage::Connections => self.list_connections(),
IpcMessage::ApproveConnection { id } => self.approve_connection(&id).await,
IpcMessage::ContactId => IpcMessage::ContactIdResponse {
contact_id: self.contact_public.to_string(),
},
IpcMessage::RotateContact => self.rotate_contact().await,
IpcMessage::Ping {
peer,
count,
interval_ms,
} => self.ping(&peer, count, interval_ms).await,
IpcMessage::Netcheck => self.netcheck().await,
other => IpcMessage::Error {
message: format!("unexpected message: {:?}", other),
},
}
}
#[tracing::instrument(skip(self, hostname), fields(mode = ?mode))]
async fn create_network(
&self,
mode: GroupMode,
name: Option<String>,
hostname: Option<String>,
) -> IpcMessage {
match self
.create_network_inner(mode, name, hostname, false, None)
.await
{
Ok(resp) => resp,
Err(e) => IpcMessage::Error {
message: format!("{e:#}"),
},
}
}
async fn create_network_inner(
&self,
mode: GroupMode,
custom_name: Option<String>,
hostname: Option<String>,
direct: bool,
pre_approve: Option<(EndpointId, Option<String>)>,
) -> Result<IpcMessage> {
let name = match custom_name {
Some(n) => {
anyhow::ensure!(
crate::hostname::is_valid_hostname(&n),
"invalid network name '{n}': use 1-63 lowercase ASCII letters, digits, or hyphens (no leading/trailing hyphen)"
);
n
}
None => network_name::generate_name(),
};
let net_secret_key = SecretKey::generate();
let net_public_key = net_secret_key.public();
if self.networks.contains_key(&name) {
return Ok(IpcMessage::Error {
message: format!("network '{name}' already active"),
});
}
let my_ip = self.identity.local_ip();
let my_hostname = match hostname {
Some(h) => {
anyhow::ensure!(
crate::hostname::is_valid_hostname(&h),
"invalid hostname '{h}': use 1-63 lowercase ASCII letters, digits, or hyphens (no leading/trailing hyphen)"
);
h
}
None => config::load()
.ok()
.and_then(|c| c.default_hostname)
.unwrap_or_else(crate::hostname::generate_hostname),
};
let mut member_list = MemberList::new();
member_list
.add(Member {
identity: self.identity.local_identity(),
ip: my_ip,
is_coordinator: true,
hostname: Some(my_hostname.clone()),
user_identity: None,
device_cert: None,
collision_index: 0,
})
.expect("self-add cannot collide");
dns::update_hostname(
&self.hostname_table,
&self.reverse_table,
&name,
&my_hostname,
my_ip,
derive_ipv6(&self.identity.local_identity()),
)
.await;
let mut approved = ApprovedList::new();
if let Some((peer_id, peer_hostname)) = pre_approve {
let peer_ip = self.identity.derive_ip(&peer_id);
approved
.approve(
ApprovedEntry {
identity: peer_id,
ip: peer_ip,
hostname: peer_hostname,
user_identity: None,
device_cert: None,
collision_index: 0,
},
&member_list,
)
.map_err(|e| anyhow::anyhow!("failed to pre-approve peer: {e:?}"))?;
}
let mut net_state = NetworkState {
members: member_list,
approved,
snapshot: None,
network_secret_key: Some(net_secret_key.clone()),
network_public_key: net_public_key,
network_name: Some(name.clone()),
mode,
suggested_firewall: SuggestedFirewall::default(),
reusable_keys: BTreeMap::new(),
pending_suggestions: Vec::new(),
pending: HashMap::new(),
};
net_state.refresh_snapshot();
if let Some(snap) = &net_state.snapshot {
let _ = self.blob_store.blobs().add_slice(&snap.msgpack_bytes).await;
}
if let Ok(pkarr_client) = dht::create_pkarr_client(&self.endpoint) {
let blob_hash = net_state
.snapshot
.as_ref()
.map(|s| s.hash)
.expect("snapshot set");
if let Err(e) = dht::publish_network(
&pkarr_client,
&net_secret_key,
&blob_hash,
&[self.endpoint.id()],
)
.await
{
tracing::warn!(error = %e, "failed to publish network record");
}
}
let member_entries = net_state
.members
.all()
.into_iter()
.map(|m| config::MemberEntry {
identity: m.identity,
ip: m.ip,
is_coordinator: m.is_coordinator,
hostname: m.hostname.clone(),
})
.collect();
let approved_entries = net_state
.approved
.all()
.into_iter()
.map(|a| config::ApprovedConfigEntry {
identity: a.identity,
ip: a.ip,
hostname: a.hostname.clone(),
})
.collect();
config::save_network(&config::NetworkConfig {
name: name.clone(),
group_mode: mode,
my_ip: Some(my_ip),
my_hostname: Some(my_hostname.clone()),
pending_hostname: None,
members: member_entries,
approved: approved_entries,
network_secret_key: Some(net_secret_key.clone()),
network_public_key: Some(net_public_key),
transport: None,
auto_accept_firewall: false,
admins: vec![],
direct,
})?;
let cancel = self.shutdown_token.child_token();
let state = Arc::new(std::sync::RwLock::new(net_state));
let invite_lock = Arc::new(tokio::sync::Mutex::new(()));
let mut tasks = Vec::new();
let dht_notify = Arc::new(tokio::sync::Notify::new());
if let Ok(pkarr_client) = dht::create_pkarr_client(&self.endpoint) {
tasks.push(spawn_network_publisher(
pkarr_client,
net_secret_key.clone(),
state.clone(),
self.endpoint.id(),
self.peers.clone(),
name.clone(),
dht_notify.clone(),
cancel.clone(),
));
}
let (disconnect_tx, disconnect_rx) = mpsc::channel::<forward::DisconnectEvent>(64);
tasks.push(spawn_peer_cleanup(
disconnect_rx,
self.peers.clone(),
cancel.clone(),
Some(CoordinatorCleanup {
state: state.clone(),
blob_store: self.blob_store.clone(),
dht_notify: Some(dht_notify.clone()),
hostname_table: self.hostname_table.clone(),
reverse_table: self.reverse_table.clone(),
device_user_map: self.device_user_map.clone(),
network_name: name.clone(),
}),
));
let handle = NetworkHandle {
name: name.clone(),
network_key: net_public_key,
role: NetworkRole::Coordinator,
my_ip,
state: state.clone(),
dht_notify: Some(dht_notify.clone()),
cancel: cancel.clone(),
tasks,
invite_lock: invite_lock.clone(),
disconnect_tx: disconnect_tx.clone(),
};
self.networks.insert(name.clone(), handle);
self.register_coordinator_handler(
&name,
state,
invite_lock,
Some(dht_notify),
net_public_key,
disconnect_tx,
cancel,
);
self.refresh_alpns().await;
tracing::info!(name = %name, key = %net_public_key, ip = %my_ip, "network created");
Ok(IpcMessage::Created {
name,
network_key: net_public_key,
my_ip,
my_ipv6: Some(derive_ipv6(&self.identity.local_identity())),
})
}
#[tracing::instrument(skip(self, hostname), fields(net = name.unwrap_or(network_key)))]
async fn join_network(
self: &Arc<Self>,
network_key: &str,
name: Option<&str>,
hostname: Option<String>,
invite: Option<Vec<u8>>,
coordinator: Option<EndpointId>,
auto_accept_firewall: bool,
) -> IpcMessage {
match self
.join_network_inner(
network_key,
name,
hostname.clone(),
invite.clone(),
coordinator,
auto_accept_firewall,
true,
)
.await
{
Ok(TryJoin::Joined(resp)) => resp,
Ok(TryJoin::Pending) => {
let me = Arc::clone(self);
let nk = network_key.to_string();
let nm = name.map(|s| s.to_string());
tokio::spawn(async move {
let mut backoff = BACKOFF_INITIAL;
loop {
tokio::select! {
_ = me.shutdown_token.cancelled() => return,
_ = tokio::time::sleep(backoff) => {}
}
backoff = (backoff * 2).min(BACKOFF_MAX);
match me
.join_network_inner(
&nk,
nm.as_deref(),
hostname.clone(),
invite.clone(),
coordinator,
auto_accept_firewall,
true,
)
.await
{
Ok(TryJoin::Joined(_)) => {
tracing::info!(net = %nk, "approval granted — joined");
return;
}
Ok(TryJoin::Pending) => continue,
Err(e) => {
tracing::warn!(net = %nk, error = %e, "join retry failed");
}
}
}
});
IpcMessage::Ok {
message: "join request sent — waiting for coordinator approval (run `ray status` to check)"
.to_string(),
}
}
Err(e) => IpcMessage::Error {
message: format!("{e:#}"),
},
}
}
#[allow(clippy::too_many_arguments)]
async fn join_network_inner(
self: &Arc<Self>,
network_key: &str,
alias: Option<&str>,
hostname: Option<String>,
invite: Option<Vec<u8>>,
coordinator: Option<EndpointId>,
auto_accept_firewall: bool,
initial: bool,
) -> Result<TryJoin> {
let net_pubkey: EndpointId = network_key.parse().context("invalid network key")?;
if let Some(a) = alias
&& self.networks.contains_key(a)
{
anyhow::bail!("already in network '{a}'");
}
let pkarr_client = dht::create_pkarr_client(&self.endpoint)?;
let record = dht::resolve_network_packet(&pkarr_client, net_pubkey)
.await
.context("failed to resolve network record")?;
if let Some(net_ver) = dht::mesh_version_from_record(&record) {
let mine = transport::MESH_PROTOCOL_VERSION;
anyhow::ensure!(
net_ver == mine,
"incompatible mesh protocol: this network runs v{net_ver}, this build speaks v{mine} \
— run `ray update` so both sides match"
);
}
let (expected_hash, peer_ids) =
dht::decode_network_record(&record).context("invalid network record")?;
if peer_ids.is_empty() {
anyhow::bail!("no peers found in network record");
}
let blob_hash = iroh_blobs::Hash::from_bytes(*expected_hash.as_bytes());
let mut group_blob = None;
for peer_id in &peer_ids {
match self.try_fetch_group_blob(*peer_id, blob_hash).await {
Ok(data) => {
group_blob = Some(data);
break;
}
Err(e) => {
tracing::warn!(peer = %peer_id.fmt_short(), error = %e, "failed to fetch blob");
continue;
}
}
}
let data = group_blob.context("could not fetch group blob from any peer")?;
let alpn = transport::network_alpn(&net_pubkey);
let my_ip = self.identity.local_ip();
let blob_name = data
.name
.clone()
.unwrap_or_else(|| network_key[..network_key.len().min(8)].to_string());
let display_name_owned = alias.map(|a| a.to_string()).unwrap_or(blob_name);
let display_name = display_name_owned.as_str();
if self.networks.contains_key(display_name) {
anyhow::bail!("already in network '{display_name}'");
}
let my_hostname = match hostname {
Some(h) => {
anyhow::ensure!(
crate::hostname::is_valid_hostname(&h),
"invalid hostname '{h}': use 1-63 lowercase ASCII letters, digits, or hyphens (no leading/trailing hyphen)"
);
h
}
None => config::load()
.ok()
.and_then(|c| c.default_hostname)
.unwrap_or_else(crate::hostname::generate_hostname),
};
let invite_lock = Arc::new(tokio::sync::Mutex::new(()));
let (state, cancel, disconnect_tx, tasks) = if initial {
let my_id = self.identity.local_identity();
let minter = coordinator.unwrap_or(my_id);
let order = coordinator_dial_order(minter, &data.members, my_id);
if order.is_empty() {
anyhow::bail!("no coordinator found in network record");
}
type JoinResources = (
Arc<std::sync::RwLock<NetworkState>>,
CancellationToken,
mpsc::Sender<forward::DisconnectEvent>,
Vec<tokio::task::JoinHandle<()>>,
);
let mut last_err = anyhow::anyhow!("no coordinators tried");
let mut found: Option<JoinResources> = None;
for coordinator_id in &order {
let cancel = self.shutdown_token.child_token();
let (disconnect_tx, disconnect_rx) = mpsc::channel::<forward::DisconnectEvent>(64);
let tasks = vec![spawn_reconnect_loop(
disconnect_rx,
self.endpoint.clone(),
alpn.clone(),
display_name.to_string(),
my_id,
my_ip,
Some(my_hostname.clone()),
self.peers.clone(),
self.tun_tx.clone(),
disconnect_tx.clone(),
cancel.clone(),
self.stats.clone(),
self.firewall.clone(),
self.device_cert.clone(),
self.device_user_map.clone(),
)];
tracing::info!(coordinator = %coordinator_id.fmt_short(), "connecting to coordinator");
let conn = match transport::connect_to_peer_with_alpn(
&self.endpoint,
*coordinator_id,
&alpn,
)
.await
{
Ok(c) => c,
Err(e) => {
tracing::warn!(coordinator = %coordinator_id.fmt_short(), error = %e, "coordinator unreachable, trying next");
cancel.cancel();
for t in tasks {
t.abort();
}
last_err = anyhow::anyhow!("coordinator offline: {e}");
continue;
}
};
match join_mesh_shared(
conn,
&self.endpoint,
display_name,
&self.identity,
&alpn,
Some(my_hostname.clone()),
self.peers.clone(),
self.tun_tx.clone(),
disconnect_tx.clone(),
cancel.clone(),
self.stats.clone(),
self.blob_store.clone(),
self.firewall.clone(),
net_pubkey,
self.device_cert.clone(),
self.device_user_map.clone(),
self.hostname_table.clone(),
self.reverse_table.clone(),
invite.clone(),
data.suggested_firewall.clone(),
data.reusable_keys.clone(),
auto_accept_firewall,
true,
self.promote_tx.clone(),
invite_lock.clone(),
self.protocol_router.pending_pongs.clone(),
)
.await
{
Ok(JoinResult::Joined(state)) => {
found = Some((state, cancel, disconnect_tx, tasks));
break;
}
Ok(JoinResult::Pending) => {
cancel.cancel();
for t in tasks {
t.abort();
}
return Ok(TryJoin::Pending);
}
Err(e) => {
tracing::warn!(coordinator = %coordinator_id.fmt_short(), error = %e, "coordinator denied or unreachable, trying next");
cancel.cancel();
for t in tasks {
t.abort();
}
last_err = e;
continue;
}
}
}
match found {
Some(resources) => resources,
None => anyhow::bail!(
"no coordinator admitted the join (tried {}): {last_err:#}",
order.len()
),
}
} else {
let coordinator_id = coordinator
.or_else(|| {
data.members
.iter()
.find(|m| m.is_coordinator)
.map(|m| m.identity)
})
.context("no coordinator found in network record")?;
tracing::info!(coordinator = %coordinator_id.fmt_short(), "connecting to coordinator");
let conn = transport::connect_to_peer_with_alpn(&self.endpoint, coordinator_id, &alpn)
.await
.map_err(|e| {
anyhow::anyhow!("coordinator offline; cannot join this network right now: {e}")
})?;
let cancel = self.shutdown_token.child_token();
let (disconnect_tx, disconnect_rx) = mpsc::channel::<forward::DisconnectEvent>(64);
let tasks = vec![spawn_reconnect_loop(
disconnect_rx,
self.endpoint.clone(),
alpn.clone(),
display_name.to_string(),
self.identity.local_identity(),
my_ip,
Some(my_hostname.clone()),
self.peers.clone(),
self.tun_tx.clone(),
disconnect_tx.clone(),
cancel.clone(),
self.stats.clone(),
self.firewall.clone(),
self.device_cert.clone(),
self.device_user_map.clone(),
)];
let state = match join_mesh_shared(
conn,
&self.endpoint,
display_name,
&self.identity,
&alpn,
Some(my_hostname.clone()),
self.peers.clone(),
self.tun_tx.clone(),
disconnect_tx.clone(),
cancel.clone(),
self.stats.clone(),
self.blob_store.clone(),
self.firewall.clone(),
net_pubkey,
self.device_cert.clone(),
self.device_user_map.clone(),
self.hostname_table.clone(),
self.reverse_table.clone(),
invite,
data.suggested_firewall.clone(),
data.reusable_keys.clone(),
auto_accept_firewall,
false,
self.promote_tx.clone(),
invite_lock.clone(),
self.protocol_router.pending_pongs.clone(),
)
.await?
{
JoinResult::Joined(state) => state,
JoinResult::Pending => {
cancel.cancel();
return Ok(TryJoin::Pending);
}
};
(state, cancel, disconnect_tx, tasks)
};
let state = state;
let held_key = state.read().unwrap().network_secret_key.clone();
let role = role_for_key_holder(held_key.is_some());
match role {
NetworkRole::Coordinator => {
let net_public_key = state.read().unwrap().network_public_key;
self.register_coordinator_handler(
display_name,
state.clone(),
invite_lock.clone(),
None,
net_public_key,
disconnect_tx.clone(),
cancel.clone(),
);
}
NetworkRole::Member | NetworkRole::Direct => {
self.protocol_router.register(
alpn.clone(),
AcceptHandler::Member(Arc::new(MemberAcceptState {
network_name: display_name.to_string(),
state: state.clone(),
peers: self.peers.clone(),
tun_tx: self.tun_tx.clone(),
disconnect_tx: disconnect_tx.clone(),
token: cancel.clone(),
stats: self.stats.clone(),
blob_store: self.blob_store.clone(),
firewall: self.firewall.clone(),
hostname_table: self.hostname_table.clone(),
reverse_table: self.reverse_table.clone(),
device_user_map: self.device_user_map.clone(),
})),
);
}
}
{
let mut s = state.write().unwrap();
s.network_public_key = net_pubkey;
s.refresh_snapshot();
}
let snap_bytes = state
.read()
.unwrap()
.snapshot
.as_ref()
.map(|s| s.msgpack_bytes.clone());
if let Some(bytes) = snap_bytes {
let _ = self.blob_store.blobs().add_slice(&bytes).await;
}
if let Ok(Some(mut net)) = config::load_network(display_name) {
net.network_public_key = Some(net_pubkey);
let _ = config::save_network(&net);
}
let mut tasks = tasks;
if let Ok(poller_client) = dht::create_pkarr_client(&self.endpoint) {
tasks.push(spawn_group_poller(
poller_client,
net_pubkey,
state.clone(),
self.endpoint.clone(),
self.blob_store.clone(),
self.peers.clone(),
display_name.to_string(),
self.firewall.clone(),
cancel.clone(),
));
}
let handle = NetworkHandle {
name: display_name.to_string(),
network_key: net_pubkey,
role: NetworkRole::Member,
my_ip,
state,
dht_notify: None,
cancel,
tasks,
invite_lock,
disconnect_tx,
};
self.networks.insert(display_name.to_string(), handle);
self.refresh_alpns().await;
dns::update_hostname(
&self.hostname_table,
&self.reverse_table,
display_name,
&my_hostname,
my_ip,
derive_ipv6(&self.identity.local_identity()),
)
.await;
for member in &data.members {
if let Some(ref h) = member.hostname {
dns::update_hostname(
&self.hostname_table,
&self.reverse_table,
display_name,
h,
member.ip,
derive_ipv6(&member.identity),
)
.await;
}
}
tracing::info!(network = %display_name, key = %network_key, ip = %my_ip, "joined network");
Ok(TryJoin::Joined(IpcMessage::Joined {
name: display_name.to_string(),
my_ip,
my_ipv6: Some(derive_ipv6(&self.identity.local_identity())),
}))
}
async fn restore_roster_from_blob(
&self,
net_pubkey: EndpointId,
) -> Result<crate::membership::GroupBlob> {
let pkarr_client = dht::create_pkarr_client(&self.endpoint)?;
let (expected_hash, seed_peers) = dht::resolve_network(&pkarr_client, net_pubkey)
.await
.context("resolve pkarr record for roster restore")?;
let blob_hash = iroh_blobs::Hash::from_bytes(*expected_hash.as_bytes());
if let Ok(bytes) = self.blob_store.blobs().get_bytes(blob_hash).await
&& let Ok(data) = verify_group_blob(&bytes, &expected_hash)
{
return Ok(data);
}
for peer_id in &seed_peers {
if *peer_id == self.endpoint.id() {
continue;
}
let conn = match transport::connect_to_peer_with_alpn(
&self.endpoint,
*peer_id,
iroh_blobs::protocol::ALPN,
)
.await
{
Ok(c) => c,
Err(_) => continue,
};
if self
.blob_store
.remote()
.fetch(conn, HashAndFormat::raw(blob_hash))
.await
.is_err()
{
continue;
}
if let Ok(bytes) = self.blob_store.blobs().get_bytes(blob_hash).await
&& let Ok(data) = verify_group_blob(&bytes, &expected_hash)
{
return Ok(data);
}
}
anyhow::bail!("group blob not found locally or at any seed peer");
}
async fn try_fetch_group_blob(
&self,
peer_id: EndpointId,
blob_hash: iroh_blobs::Hash,
) -> Result<crate::membership::GroupBlob> {
let conn = transport::connect_to_peer_with_alpn(
&self.endpoint,
peer_id,
iroh_blobs::protocol::ALPN,
)
.await?;
self.blob_store
.remote()
.fetch(conn, HashAndFormat::raw(blob_hash))
.await
.map_err(|e| anyhow::anyhow!("blob fetch failed: {e}"))?;
let bytes = self
.blob_store
.blobs()
.get_bytes(blob_hash)
.await
.map_err(|e| anyhow::anyhow!("blob read failed: {e}"))?;
crate::membership::decode_group_blob(&bytes)
}
#[allow(dead_code)]
async fn try_dht_fallback_join(
&self,
network_name: &str,
net_pubkey: EndpointId,
alpn: &[u8],
) -> Result<IpcMessage> {
tracing::info!(network = %network_name, "trying DHT fallback");
let pkarr_client = dht::create_pkarr_client(&self.endpoint)?;
let (expected_hash, _peer_ids) = dht::resolve_network(&pkarr_client, net_pubkey).await?;
let my_identity = self.identity.local_identity();
let blob_hash = iroh_blobs::Hash::from_bytes(*expected_hash.as_bytes());
let app_config = config::load()?;
let net_config = app_config
.networks
.iter()
.find(|n| n.name == network_name)
.context("network not in config")?;
for member in &net_config.members {
if member.identity == my_identity {
continue;
}
let blobs_conn = match transport::connect_to_peer_with_alpn(
&self.endpoint,
member.identity,
iroh_blobs::protocol::ALPN,
)
.await
{
Ok(c) => c,
Err(_) => continue,
};
if self
.blob_store
.remote()
.fetch(blobs_conn, HashAndFormat::raw(blob_hash))
.await
.is_err()
{
continue;
}
let blob_bytes = match self.blob_store.blobs().get_bytes(blob_hash).await {
Ok(bytes) => bytes,
Err(_) => continue,
};
let data = verify_group_blob(&blob_bytes, &expected_hash)?;
tracing::info!(network = %network_name, members = data.members.len(), "group blob resolved via DHT fallback");
let my_ip = self.identity.local_ip();
let my_hostname = net_config.my_hostname.clone();
let cancel = self.shutdown_token.child_token();
let (disconnect_tx, disconnect_rx) = mpsc::channel::<forward::DisconnectEvent>(64);
let tasks = vec![spawn_reconnect_loop(
disconnect_rx,
self.endpoint.clone(),
alpn.to_vec(),
network_name.to_string(),
my_identity,
my_ip,
my_hostname.clone(),
self.peers.clone(),
self.tun_tx.clone(),
disconnect_tx.clone(),
cancel.clone(),
self.stats.clone(),
self.firewall.clone(),
self.device_cert.clone(),
self.device_user_map.clone(),
)];
self.dial_all_members(
&data.members,
alpn,
network_name,
my_identity,
my_ip,
my_hostname.clone(),
disconnect_tx.clone(),
cancel.clone(),
)
.await;
let mut ns = NetworkState {
members: MemberList::from_members(data.members),
approved: ApprovedList::from_entries(data.approved),
snapshot: None,
network_secret_key: None,
network_public_key: net_pubkey,
network_name: data.name.clone(),
mode: GroupMode::Restricted,
suggested_firewall: SuggestedFirewall::default(),
reusable_keys: data.reusable_keys.clone(),
pending_suggestions: Vec::new(),
pending: HashMap::new(),
};
ns.refresh_snapshot();
let live_state = Arc::new(std::sync::RwLock::new(ns));
let handle = NetworkHandle {
name: network_name.to_string(),
network_key: net_pubkey,
role: NetworkRole::Member,
my_ip,
state: live_state,
dht_notify: None,
cancel,
tasks,
invite_lock: Arc::new(tokio::sync::Mutex::new(())),
disconnect_tx,
};
self.networks.insert(network_name.to_string(), handle);
self.refresh_alpns().await;
return Ok(IpcMessage::Joined {
name: network_name.to_string(),
my_ip,
my_ipv6: Some(derive_ipv6(&self.identity.local_identity())),
});
}
anyhow::bail!("no peers reachable for DHT fallback")
}
#[allow(clippy::too_many_arguments)]
async fn dial_all_members(
&self,
members: &[Member],
alpn: &[u8],
network_name: &str,
my_identity: EndpointId,
my_ip: Ipv4Addr,
my_hostname: Option<String>,
disconnect_tx: mpsc::Sender<forward::DisconnectEvent>,
cancel: CancellationToken,
) {
let my_hostname = outgoing_hostname(network_name).or(my_hostname);
for m in members {
if m.identity == my_identity {
continue;
}
match transport::connect_to_peer_with_alpn(&self.endpoint, m.identity, alpn).await {
Ok(peer_conn) => {
if let Ok((mut s, _)) = peer_conn.open_bi().await {
let _ = control::send_msg(
&mut s,
&ControlMsg::MeshHello {
identity: my_identity,
ip: my_ip,
hostname: my_hostname.clone(),
device_cert: self.device_cert.clone(),
},
)
.await;
}
crate::spawn_path_logger(peer_conn.clone(), m.identity.fmt_short().to_string());
self.peers.add(
m.ip,
derive_ipv6(&m.identity),
peer_conn.clone(),
m.identity,
network_name,
);
forward::spawn_peer_reader(
peer_conn,
m.identity,
m.ip,
derive_ipv6(&m.identity),
network_name.to_string(),
self.firewall.clone(),
self.tun_tx.clone(),
disconnect_tx.clone(),
cancel.clone(),
self.stats.clone(),
self.device_user_map.clone(),
);
tracing::info!(
network = %network_name,
peer = %m.identity.fmt_short(),
"dialed known member on restore/join (full mesh)"
);
}
Err(e) => {
tracing::debug!(
network = %network_name,
peer = %m.identity.fmt_short(),
error = %e,
"could not dial member yet; reconnect loop will retry"
);
}
}
}
}
async fn restore_coordinator_network(&self, name: &str, mode: GroupMode) -> Result<IpcMessage> {
{
if self.networks.contains_key(name) {
return Ok(IpcMessage::Error {
message: format!("network '{name}' already active"),
});
}
}
let my_ip = self.identity.local_ip();
let app_config = config::load()?;
let net_config = app_config.networks.iter().find(|n| n.name == name);
let net_secret_key = net_config
.and_then(|nc| nc.network_secret_key.clone())
.context("no network secret key in config — cannot restore as coordinator")?;
let net_public_key = net_secret_key.public();
let persisted_hostname = net_config.and_then(|nc| nc.my_hostname.clone());
let mut member_list = MemberList::new();
let mut approved_list = ApprovedList::new();
let mut suggested_firewall = SuggestedFirewall::default();
let mut reusable_keys = BTreeMap::new();
match self.restore_roster_from_blob(net_public_key).await {
Ok(data) => {
suggested_firewall = data.suggested_firewall.clone();
reusable_keys = data.reusable_keys.clone();
for m in &data.members {
let _ = member_list.add(m.clone());
}
for a in &data.approved {
let _ = approved_list.approve(a.clone(), &member_list);
}
tracing::info!(
network = %name,
members = member_list.all().len(),
"restored roster from published group blob"
);
}
Err(e) => {
tracing::warn!(
network = %name,
error = %e,
"could not restore roster from DHT blob; falling back to config (may be stale)"
);
if let Some(nc) = net_config {
for entry in &nc.members {
let _ = member_list.add(Member {
identity: entry.identity,
ip: entry.ip,
is_coordinator: entry.is_coordinator,
hostname: entry.hostname.clone(),
user_identity: None,
device_cert: None,
collision_index: 0,
});
}
for entry in &nc.approved {
let ae = ApprovedEntry {
identity: entry.identity,
ip: entry.ip,
hostname: entry.hostname.clone(),
user_identity: None,
device_cert: None,
collision_index: 0,
};
let _ = approved_list.approve(ae, &member_list);
}
}
}
}
if !member_list.is_member(&self.identity.local_identity()) {
member_list
.add(Member {
identity: self.identity.local_identity(),
ip: my_ip,
is_coordinator: true,
hostname: persisted_hostname.clone(),
user_identity: None,
device_cert: None,
collision_index: 0,
})
.expect("self-add cannot collide");
}
let mut net_state = NetworkState {
members: member_list,
approved: approved_list,
snapshot: None,
network_secret_key: Some(net_secret_key.clone()),
network_public_key: net_public_key,
network_name: Some(name.to_string()),
mode,
suggested_firewall,
reusable_keys,
pending_suggestions: Vec::new(),
pending: HashMap::new(),
};
net_state.refresh_snapshot();
if let Some(snap) = &net_state.snapshot {
let _ = self.blob_store.blobs().add_slice(&snap.msgpack_bytes).await;
}
if let Ok(pkarr_client) = dht::create_pkarr_client(&self.endpoint) {
let blob_hash = net_state
.snapshot
.as_ref()
.map(|s| s.hash)
.expect("snapshot set");
if let Err(e) = dht::publish_network(
&pkarr_client,
&net_secret_key,
&blob_hash,
&[self.endpoint.id()],
)
.await
{
tracing::warn!(error = %e, "failed to publish network record on restore");
}
}
let member_entries = net_state
.members
.all()
.into_iter()
.map(|m| config::MemberEntry {
identity: m.identity,
ip: m.ip,
is_coordinator: m.is_coordinator,
hostname: m.hostname.clone(),
})
.collect();
let approved_entries = net_state
.approved
.all()
.into_iter()
.map(|a| config::ApprovedConfigEntry {
identity: a.identity,
ip: a.ip,
hostname: a.hostname.clone(),
})
.collect();
config::save_network(&config::NetworkConfig {
name: name.to_string(),
group_mode: mode,
my_ip: Some(my_ip),
my_hostname: persisted_hostname.clone(),
pending_hostname: None,
members: member_entries,
approved: approved_entries,
network_secret_key: Some(net_secret_key.clone()),
network_public_key: Some(net_public_key),
transport: None,
auto_accept_firewall: net_config
.map(|nc| nc.auto_accept_firewall)
.unwrap_or(false),
admins: net_config.map(|nc| nc.admins.clone()).unwrap_or_default(),
direct: net_config.map(|nc| nc.direct).unwrap_or(false),
})?;
let cancel = self.shutdown_token.child_token();
let state = Arc::new(std::sync::RwLock::new(net_state));
let invite_lock = Arc::new(tokio::sync::Mutex::new(()));
let mut tasks = Vec::new();
let dht_notify = Arc::new(tokio::sync::Notify::new());
if let Ok(pkarr_client) = dht::create_pkarr_client(&self.endpoint) {
tasks.push(spawn_network_publisher(
pkarr_client,
net_secret_key.clone(),
state.clone(),
self.endpoint.id(),
self.peers.clone(),
name.to_string(),
dht_notify.clone(),
cancel.clone(),
));
}
let (disconnect_tx, disconnect_rx) = mpsc::channel::<forward::DisconnectEvent>(64);
tasks.push(spawn_peer_cleanup(
disconnect_rx,
self.peers.clone(),
cancel.clone(),
Some(CoordinatorCleanup {
state: state.clone(),
blob_store: self.blob_store.clone(),
dht_notify: Some(dht_notify.clone()),
hostname_table: self.hostname_table.clone(),
reverse_table: self.reverse_table.clone(),
device_user_map: self.device_user_map.clone(),
network_name: name.to_string(),
}),
));
self.register_coordinator_handler(
name,
state.clone(),
invite_lock.clone(),
Some(dht_notify.clone()),
net_public_key,
disconnect_tx.clone(),
cancel.clone(),
);
{
let members_snapshot: Vec<_> = {
let s = state.read().unwrap();
s.members
.all()
.into_iter()
.filter_map(|m| {
m.hostname
.as_ref()
.map(|h| (h.clone(), m.ip, derive_ipv6(&m.identity)))
})
.collect()
};
for (hostname, ip, ipv6) in members_snapshot {
dns::update_hostname(
&self.hostname_table,
&self.reverse_table,
name,
&hostname,
ip,
ipv6,
)
.await;
}
}
let members_to_dial: Vec<Member> = state
.read()
.unwrap()
.members
.all()
.into_iter()
.cloned()
.collect();
let alpn = transport::network_alpn(&net_public_key);
self.dial_all_members(
&members_to_dial,
&alpn,
name,
self.identity.local_identity(),
my_ip,
persisted_hostname.clone(),
disconnect_tx.clone(),
cancel.clone(),
)
.await;
let handle = NetworkHandle {
name: name.to_string(),
network_key: net_public_key,
role: NetworkRole::Coordinator,
my_ip,
state,
dht_notify: Some(dht_notify),
cancel,
tasks,
invite_lock,
disconnect_tx,
};
self.networks.insert(name.to_string(), handle);
self.refresh_alpns().await;
tracing::info!(name = %name, key = %net_public_key, ip = %my_ip, "network restored (coordinator)");
Ok(IpcMessage::Created {
name: name.to_string(),
network_key: net_public_key,
my_ip,
my_ipv6: Some(derive_ipv6(&self.identity.local_identity())),
})
}
#[tracing::instrument(skip(self), fields(net = name))]
async fn nuke_network(&self, name: &str, force: bool) -> IpcMessage {
let (is_coordinator, has_other_members) = {
let handle = match self.networks.get(name) {
Some(h) => h,
None => {
return IpcMessage::Error {
message: format!("not in network '{name}'"),
};
}
};
let state = handle.state.read().unwrap();
let my_id = self.endpoint.id();
let is_coord = state
.members
.get(&my_id)
.map(|m| m.is_coordinator)
.unwrap_or(false);
let others = state.members.all().len() > 1;
(is_coord, others)
};
if !is_coordinator {
return IpcMessage::Error {
message: "only the coordinator can nuke a network".to_string(),
};
}
if has_other_members && !force {
return IpcMessage::Error {
message: "network has other members — use --force to destroy, or transfer ownership first".to_string(),
};
}
let net_secret_key = {
let handle = self.networks.get(name).unwrap();
let state = handle.state.read().unwrap();
state.network_secret_key.clone()
};
if let Some(key) = net_secret_key
&& let Ok(client) = dht::create_pkarr_client(&self.endpoint)
{
let empty_hash = group_blob_hash(
&MemberList::new(),
&ApprovedList::new(),
&SuggestedFirewall::default(),
None,
&BTreeMap::new(),
);
if let Err(e) = dht::publish_network(&client, &key, &empty_hash, &[]).await {
tracing::warn!(error = %e, "failed to publish empty network record on nuke");
}
}
self.leave_network(name).await
}
async fn connect_all_networks(self: &Arc<Self>) {
let app_config = match config::load() {
Ok(c) => c,
Err(e) => {
tracing::warn!(error = %e, "failed to load config during connect");
return;
}
};
let mut count = 0;
for net in &app_config.networks {
count += 1;
if net.network_secret_key.is_some() {
let name = net.name.clone();
let mode = net.group_mode;
let daemon_c = Arc::clone(self);
tokio::spawn(async move {
match daemon_c.restore_coordinator_network(&name, mode).await {
Ok(IpcMessage::Created { name, .. }) => {
tracing::info!(network = %name, "restored coordinator network");
}
Ok(IpcMessage::Error { message }) => {
tracing::warn!(network = %name, error = %message, "failed to restore network");
}
Err(e) => {
tracing::warn!(network = %name, error = %e, "failed to restore network");
}
_ => {}
}
});
} else {
let name = net.name.clone();
let persisted_hostname = net.my_hostname.clone();
let net_auto_accept = net.auto_accept_firewall;
let net_pubkey = match &net.network_public_key {
Some(k) => k.to_string(),
None => {
tracing::warn!(network = %name, "no network public key in config, skipping restore");
continue;
}
};
let daemon_c = Arc::clone(self);
tokio::spawn(async move {
match daemon_c
.join_network_inner(
&net_pubkey,
Some(&name),
persisted_hostname,
None,
None,
net_auto_accept,
false,
)
.await
{
Ok(TryJoin::Joined(IpcMessage::Joined { name, my_ip, .. })) => {
tracing::info!(network = %name, ip = %my_ip, "restored member network");
}
Ok(_) => {}
Err(e) => {
tracing::warn!(network = %name, error = %e, "failed to restore network");
}
}
});
}
}
if let Some(secret) = app_config.contact_secret_key.clone()
&& let Ok(client) = dht::create_pkarr_client(&self.endpoint)
{
let endpoint_id = self.endpoint.id();
tokio::spawn(async move {
if let Err(e) = dht::publish_contact(&client, &secret, endpoint_id).await {
tracing::warn!(error = %e, "failed to publish contact record on connect");
}
});
}
tracing::info!(networks = count, "control plane connected");
}
async fn activate(self: &Arc<Self>, hostname: Option<String>) -> IpcMessage {
if let Some(h) = hostname {
if !crate::hostname::is_valid_hostname(&h) {
return IpcMessage::Error {
message: format!(
"invalid hostname '{h}': use 1-63 lowercase ASCII letters, digits, or hyphens (no leading/trailing hyphen)"
),
};
}
match config::load() {
Ok(mut app_config) => {
app_config.default_hostname = Some(h);
if let Err(e) = config::save_settings(&app_config) {
tracing::warn!(error = %e, "failed to persist default hostname");
}
}
Err(e) => {
tracing::warn!(error = %e, "failed to load config to set default hostname")
}
}
}
if self.active.swap(true, Ordering::SeqCst) {
return IpcMessage::Ok {
message: "already up".into(),
};
}
let mut warnings: Vec<String> = Vec::new();
if let Err(e) = tun::set_link_up(&self.tun_name) {
tracing::warn!(error = %e, "failed to bring TUN interface up");
warnings.push(format!("failed to bring TUN interface up: {e}"));
}
if let Err(e) = tun::route_peer_range(&self.tun_name).await {
tracing::warn!(error = %e, "failed to route 200::/7 into TUN");
warnings.push(format!("failed to route IPv6 peer range into TUN: {e}"));
}
if let Err(e) = tun::route_magic_dns(&self.tun_name).await {
tracing::warn!(error = %e, "failed to route magic DNS IP into TUN");
}
let my_v4 = self.identity.local_ip();
let my_v6 = derive_ipv6(&self.identity.local_identity());
if let Err(e) = tun::route_self_loopback(my_v4, my_v6).await {
tracing::warn!(error = %e, "failed to install loopback self-route");
warnings.push(format!("failed to install loopback self-route: {e}"));
}
dns_config::restore_stale_backups();
match dns_config::detect_and_configure(&self.tun_name).await {
Ok(c) => {
let captured = c.captured_upstreams();
let dns_override = config::load().map(|c| c.dns_upstreams).unwrap_or_default();
let upstreams = config::resolve_upstreams(&dns_override, captured);
let is_direct = c.name() == "direct-resolv.conf";
#[cfg(target_os = "linux")]
let search = c.search_domains();
tracing::info!(backend = c.name(), resolver_ip = %crate::dns::MAGIC_DNS_V4, upstreams = ?upstreams, "Magic DNS active");
self.resolver.set_upstreams(upstreams);
*self.dns_configurator.lock().unwrap() = Some(c);
#[cfg(target_os = "linux")]
if is_direct {
let rt = tokio_util::sync::CancellationToken::new();
*self.dns_reassert_token.lock().unwrap() = Some(rt.clone());
tokio::spawn(dns_config::run_resolv_reassert(search, rt));
}
#[cfg(not(target_os = "linux"))]
let _ = is_direct;
}
Err(e) => {
tracing::warn!(error = %e, "failed to configure system DNS (Magic DNS requires manual setup)");
warnings.push(format!(
"failed to configure system DNS, so .ray names won't resolve: {e}"
));
}
}
tracing::info!("data plane activated");
if warnings.is_empty() {
IpcMessage::Ok {
message: "VPN up".into(),
}
} else {
let mut message = String::from("VPN up, but some things need attention:");
for w in &warnings {
message.push_str("\n - ");
message.push_str(w);
}
IpcMessage::Ok { message }
}
}
async fn deactivate(&self) -> IpcMessage {
if !self.active.swap(false, Ordering::SeqCst) {
return IpcMessage::Ok {
message: "already on standby".into(),
};
}
if let Some(rt) = self.dns_reassert_token.lock().unwrap().take() {
rt.cancel();
}
let configurator = self.dns_configurator.lock().unwrap().take();
if let Some(configurator) = configurator
&& let Err(e) = dns_config::revert(configurator.as_ref()).await
{
tracing::warn!(error = %e, "failed to revert DNS configuration");
}
dns_config::clear_search_domains(&self.tun_name).await;
if let Err(e) = tun::set_link_down(&self.tun_name) {
tracing::warn!(error = %e, "failed to bring TUN interface down");
}
tracing::info!("VPN on standby");
IpcMessage::Ok {
message: "VPN on standby (still connected to peers)".into(),
}
}
async fn teardown_network_runtime(&self, name: &str) -> bool {
let Some(handle) = self.networks.remove(name).map(|(_, v)| v) else {
return false;
};
handle.cancel.cancel();
for task in handle.tasks {
let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
}
self.peers.remove_by_network(name);
dns::remove_network(&self.hostname_table, &self.reverse_table, name).await;
self.protocol_router
.unregister(&transport::network_alpn(&handle.network_key));
self.refresh_alpns().await;
true
}
#[tracing::instrument(skip(self), fields(net = name))]
async fn leave_network(&self, name: &str) -> IpcMessage {
for (_eid, _ip, conn) in self.peers.peers_for_network_with_conn(name) {
conn.close(VarInt::from_u32(forward::LEAVE_CODE), b"leave");
}
let was_active = self.teardown_network_runtime(name).await;
let removed_from_config = config::delete_network(name).unwrap_or(false);
if was_active || removed_from_config {
tracing::info!(network = %name, "left network");
IpcMessage::Ok {
message: format!("left network '{}'", name),
}
} else {
IpcMessage::Error {
message: format!("network '{}' not found", name),
}
}
}
fn status(&self) -> IpcMessage {
let hostname_snapshot = self.hostname_table.try_read().ok();
let my_id = self.endpoint.id();
let direct_names: std::collections::HashSet<String> = config::load()
.map(|c| {
c.networks
.iter()
.filter(|n| n.direct)
.map(|n| n.name.clone())
.collect()
})
.unwrap_or_default();
let statuses: Vec<NetworkStatus> = self
.networks
.iter()
.map(|h| {
let (members, member_count, pending_suggestions, pending_requests) = {
let s = match h.state.read() {
Ok(s) => s,
Err(_) => {
return NetworkStatus {
name: h.name.clone(),
role: if direct_names.contains(&h.name) {
NetworkRole::Direct
} else {
h.role.clone()
},
my_ip: h.my_ip,
my_ipv6: Some(derive_ipv6(&my_id)),
my_hostname: None,
network_key: Some(h.network_key.to_string()),
member_count: 0,
peers: vec![],
pending_suggestions: 0,
pending_requests: 0,
};
}
};
let count = s.members.all().len();
let all = s.members.all().into_iter().cloned().collect::<Vec<_>>();
(all, count, s.pending_suggestions.len(), s.pending.len())
};
let connected: HashMap<EndpointId, Connection> = self
.peers
.peers_for_network_with_conn(&h.name)
.into_iter()
.map(|(eid, _, conn)| (eid, conn))
.collect();
let network_key = Some(h.network_key.to_string());
let peers = members
.iter()
.filter(|m| m.identity != my_id)
.map(|m| {
let hostname = m.hostname.clone().or_else(|| {
hostname_snapshot.as_ref().and_then(|table| {
table.get(&h.name).and_then(|hosts| {
hosts
.iter()
.find(|(_, v)| v.0 == m.ip)
.map(|(k, _)| k.clone())
})
})
});
let connection = connected.get(&m.identity).map(Self::gather_conn_info);
let user_id = self.device_user_map.resolve(&m.identity);
let user_identity = if user_id != m.identity {
Some(user_id)
} else {
None
};
PeerStatus {
endpoint_id: m.identity,
ip: m.ip,
ipv6: Some(derive_ipv6(&m.identity)),
hostname,
user_identity,
connection,
}
})
.collect();
let my_hostname = hostname_snapshot.as_ref().and_then(|table| {
table.get(&h.name).and_then(|hosts| {
hosts
.iter()
.find(|(_, v)| v.0 == h.my_ip)
.map(|(k, _)| k.clone())
})
});
NetworkStatus {
name: h.name.clone(),
role: if direct_names.contains(&h.name) {
NetworkRole::Direct
} else {
h.role.clone()
},
my_ip: h.my_ip,
my_ipv6: Some(derive_ipv6(&self.identity.local_identity())),
my_hostname,
network_key,
member_count,
peers,
pending_suggestions,
pending_requests,
}
})
.collect();
IpcMessage::StatusResponse {
endpoint_id: self.endpoint.id(),
mdns_enabled: self.mdns_enabled,
active: self.active.load(Ordering::SeqCst),
contact_id: Some(self.contact_public.to_string()),
daemon_version: env!("CARGO_PKG_VERSION").to_string(),
networks: statuses,
packets_rx: self.stats.packets_rx.get(),
packets_tx: self.stats.packets_tx.get(),
bytes_rx: self.stats.bytes_rx.get(),
bytes_tx: self.stats.bytes_tx.get(),
pending_files: self.protocol_router.pending_files.lock().unwrap().len(),
pending_connects: self.protocol_router.pending_connects.len(),
}
}
fn build_report(&self, peer_cred: Option<(u32, u32)>) -> IpcMessage {
use std::fmt::Write as _;
let version = env!("CARGO_PKG_VERSION");
let os = std::env::consts::OS;
let arch = std::env::consts::ARCH;
let uname = std::process::Command::new("uname")
.arg("-a")
.output()
.ok()
.and_then(|o| String::from_utf8(o.stdout).ok())
.map(|s| s.trim().to_string())
.unwrap_or_default();
let uptime = self.start.elapsed().as_secs();
let active = self.active.load(Ordering::SeqCst);
let mut sysinfo = String::new();
let _ = writeln!(sysinfo, "rayfish {version}");
let _ = writeln!(sysinfo, "os: {os} arch: {arch}");
if !uname.is_empty() {
let _ = writeln!(sysinfo, "uname: {uname}");
}
let _ = writeln!(sysinfo, "endpoint_id: {}", self.endpoint.id());
let _ = writeln!(sysinfo, "uptime_secs: {uptime}");
let _ = writeln!(sysinfo, "active: {active}");
let _ = writeln!(sysinfo, "networks: {}", self.networks.len());
let snap = self.stats.snapshot(self.start);
let total_drops: u64 = snap.drops.iter().map(|(_, c)| c).sum();
let mut metrics = String::new();
let _ = writeln!(metrics, "packets_rx: {}", snap.packets_rx);
let _ = writeln!(metrics, "packets_tx: {}", snap.packets_tx);
let _ = writeln!(metrics, "bytes_rx: {}", snap.bytes_rx);
let _ = writeln!(metrics, "bytes_tx: {}", snap.bytes_tx);
let _ = writeln!(metrics, "drops_total: {total_drops}");
for (reason, count) in &snap.drops {
let _ = writeln!(metrics, " drop[{reason}]: {count}");
}
let status = format!("{:#?}", self.status());
let mut files: Vec<(String, Vec<u8>)> = vec![
("sysinfo.txt".to_string(), sysinfo.into_bytes()),
("metrics.txt".to_string(), metrics.into_bytes()),
("status.txt".to_string(), status.into_bytes()),
];
files.extend(collect_recent_logs());
let has_panics = files.iter().any(|(name, _)| name == "logs/panic.log");
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let path = std::path::PathBuf::from("/tmp").join(format!("rayfish-report-{ts}.tgz"));
if let Err(e) = write_bundle(&path, &files) {
return IpcMessage::Error {
message: format!("failed to write report bundle: {e}"),
};
}
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o644));
if let Some((uid, gid)) = peer_cred {
use std::os::unix::ffi::OsStrExt;
if let Ok(c) = std::ffi::CString::new(path.as_os_str().as_bytes()) {
unsafe { libc::chown(c.as_ptr(), uid, gid) };
}
}
let issue_title = if has_panics {
format!("[report] crash diagnostics from {os} (rayfish {version})")
} else {
format!("[report] diagnostics from {os} (rayfish {version})")
};
let mut issue_body = String::new();
let _ = writeln!(issue_body, "**rayfish {version}** on {os}/{arch}");
let _ = writeln!(issue_body);
if has_panics {
let _ = writeln!(
issue_body,
"⚠️ One or more panics were recorded — see `logs/panic.log` in the bundle.\n"
);
}
let _ = writeln!(
issue_body,
"Metrics: rx {} pkts / tx {} pkts, {} drops, uptime {}s",
snap.packets_rx, snap.packets_tx, total_drops, uptime
);
let _ = writeln!(issue_body);
let _ = writeln!(
issue_body,
"Diagnostic bundle: `{}` — **please attach this file to the issue.**",
path.display()
);
let _ = writeln!(issue_body);
let _ = writeln!(issue_body, "<!-- Describe what went wrong below. -->");
IpcMessage::ReportBundle {
path: path.display().to_string(),
issue_title,
issue_body,
}
}
fn gather_conn_info(conn: &iroh::endpoint::Connection) -> ipc::ConnectionInfo {
let paths = conn.paths();
let classes: Vec<(ipc::ConnType, bool)> = paths
.iter()
.map(|p| {
let addr = p.remote_addr();
let ct = if addr.is_relay() {
ipc::ConnType::Relay
} else if addr.is_custom() {
ipc::ConnType::Tor
} else {
ipc::ConnType::Direct
};
(ct, p.is_selected())
})
.collect();
let (conn_type, remote_addr, rtt_ms) = match choose_path_index(&classes)
.and_then(|idx| paths.iter().nth(idx).map(|p| (idx, p)))
{
Some((idx, path)) => {
let rtt = path.rtt().as_secs_f64() * 1000.0;
(
classes[idx].0.clone(),
Some(path.remote_addr().to_string()),
Some(rtt),
)
}
None => (ipc::ConnType::Unknown, None, None),
};
let stats = conn.stats();
ipc::ConnectionInfo {
conn_type,
remote_addr,
rtt_ms,
bytes_tx: stats.udp_tx.bytes,
bytes_rx: stats.udp_rx.bytes,
datagrams_tx: stats.udp_tx.datagrams,
datagrams_rx: stats.udp_rx.datagrams,
lost_packets: stats.lost_packets,
}
}
async fn resolve_peer_ip(&self, name: &str) -> Option<(Ipv4Addr, String)> {
let id = self.resolve_peer_name(name).await?;
for entry in self.networks.iter() {
let state = entry.value().state.read().unwrap();
if let Some(m) = state.members.all().iter().find(|m| m.identity == id) {
let display = m
.hostname
.clone()
.unwrap_or_else(|| id.fmt_short().to_string());
return Some((m.ip, display));
}
}
None
}
async fn ping(&self, peer: &str, count: u32, interval_ms: u64) -> IpcMessage {
let (ip, display) = match self.resolve_peer_ip(peer).await {
Some(x) => x,
None => {
return IpcMessage::Error {
message: format!("unknown peer '{peer}'"),
};
}
};
let route = match self.peers.lookup_v4(&ip) {
Some(r) => r,
None => {
return IpcMessage::Error {
message: format!(
"{display} is not connected (no live mesh link to {ip})"
),
};
}
};
let conn = route.conn;
let network = route.network.to_string();
let count = count.clamp(1, 100);
let mut probes: Vec<Option<f64>> = Vec::with_capacity(count as usize);
for seq in 0..count {
if seq > 0 {
tokio::time::sleep(Duration::from_millis(interval_ms)).await;
}
let nonce: u64 = rand::random();
let (tx, rx) = tokio::sync::oneshot::channel();
self.protocol_router.pending_pongs.insert(nonce, tx);
let sent = Instant::now();
let sent_ok = match conn.open_bi().await {
Ok((mut send, _)) => {
control::send_msg(&mut send, &control::ControlMsg::Ping { nonce })
.await
.is_ok()
}
Err(_) => false,
};
let rtt = if sent_ok {
match tokio::time::timeout(Duration::from_secs(1), rx).await {
Ok(Ok(())) => Some(sent.elapsed().as_secs_f64() * 1000.0),
_ => None,
}
} else {
None
};
self.protocol_router.pending_pongs.remove(&nonce);
probes.push(rtt);
}
let info = Self::gather_conn_info(&conn);
IpcMessage::PingResponse {
peer_name: display,
conn_type: info.conn_type,
remote_addr: info.remote_addr,
network,
probes,
}
}
async fn netcheck(&self) -> IpcMessage {
use iroh::Watcher as _;
let bound = self.endpoint.bound_sockets();
let bound_port = bound.first().map(|a| a.port()).unwrap_or(0);
let port_is_fixed = bound_port == transport::RAYFISH_LISTEN_PORT;
let report = {
let mut w = self.endpoint.net_report();
match tokio::time::timeout(Duration::from_secs(3), w.initialized()).await {
Ok(r) => Some(r),
Err(_) => w.get(),
}
};
let mut home_relay = None;
let mut relay_latency_ms = None;
let mut public_ipv4 = None;
let mut public_ipv6 = None;
let mut udp = false;
if let Some(r) = report {
udp = r.has_udp();
public_ipv4 = r.global_v4.map(|a| a.to_string());
public_ipv6 = r.global_v6.map(|a| a.to_string());
if let Some(pref) = r.preferred_relay.clone() {
home_relay = Some(pref.to_string());
relay_latency_ms = r
.relay_latency
.iter()
.filter(|(_, url, _)| **url == pref)
.map(|(_, _, d)| d.as_secs_f64() * 1000.0)
.fold(None, |acc: Option<f64>, v| {
Some(acc.map_or(v, |a| a.min(v)))
});
}
}
if home_relay.is_none() {
let status = self.endpoint.home_relay_status().get();
home_relay = status.first().map(|s| s.url().to_string());
}
IpcMessage::NetcheckResponse {
bound_port,
port_is_fixed,
home_relay,
relay_latency_ms,
public_ipv4,
public_ipv6,
udp,
}
}
async fn set_hostname(&self, network: &str, hostname: &str) -> IpcMessage {
use crate::hostname;
if !hostname::is_valid_hostname(hostname) {
return IpcMessage::Error {
message: "invalid hostname (lowercase ASCII, 1-63 chars)".to_string(),
};
}
let (my_ip, is_coord, state, dht_notify) = match self.networks.get(network) {
Some(h) => (
h.my_ip,
h.role.is_coordinator(),
h.state.clone(),
h.dht_notify.clone(),
),
None => {
return IpcMessage::Error {
message: format!("network '{}' not found", network),
};
}
};
let my_identity = self.endpoint.id();
let new_hostname = if is_coord {
let taken: Vec<String> = {
let s = state.read().unwrap();
s.members
.all()
.iter()
.filter(|m| m.identity != my_identity)
.filter_map(|m| m.hostname.clone())
.collect()
};
let taken_refs: Vec<&str> = taken.iter().map(|s| s.as_str()).collect();
hostname::resolve_collision(hostname, &taken_refs)
} else {
hostname.to_string()
};
if let Ok(mut s) = state.write()
&& let Some(me) = s.members.get_mut(&my_identity)
{
me.hostname = Some(new_hostname.clone());
}
dns::remove_hostname_by_ip(&self.hostname_table, &self.reverse_table, network, my_ip).await;
dns::update_hostname(
&self.hostname_table,
&self.reverse_table,
network,
&new_hostname,
my_ip,
derive_ipv6(&self.identity.local_identity()),
)
.await;
if let Ok(Some(mut net)) = config::load_network(network) {
net.my_hostname = Some(new_hostname.clone());
net.pending_hostname = if is_coord {
None
} else {
Some(new_hostname.clone())
};
let _ = config::save_network(&net);
}
if is_coord {
tracing::info!(
network = %network,
hostname = %new_hostname,
"coordinator renamed self; republishing blob + broadcasting MemberSync"
);
update_snapshot_and_publish(&state, &self.blob_store, &dht_notify).await;
broadcast_member_sync(&self.peers, None).await;
} else {
let peers = self.peers.peers_for_network_with_conn(network);
tracing::info!(
network = %network,
hostname = %new_hostname,
connected_peers = peers.len(),
"member rename queued as pending intent; sending MeshHello to connected peers"
);
let mut sent = 0usize;
for (_peer_id, _peer_ip, conn) in &peers {
if let Ok((mut send, _recv)) = conn.open_bi().await {
let msg = ControlMsg::MeshHello {
identity: my_identity,
ip: my_ip,
hostname: Some(new_hostname.clone()),
device_cert: self.device_cert.clone(),
};
if control::send_msg(&mut send, &msg).await.is_ok() {
sent += 1;
}
}
}
tracing::debug!(
network = %network,
hostname = %new_hostname,
sent,
connected_peers = peers.len(),
"fast-path rename MeshHello delivered; drain backstop covers the rest"
);
}
let dns_name = format!("{}.{}.{}", new_hostname, network, crate::DNS_DOMAIN);
IpcMessage::Ok {
message: format!("hostname set to {} ({})", new_hostname, dns_name),
}
}
fn resolve_short_id_any_network(&self, short: &str) -> Option<EndpointId> {
if short == "self" {
return Some(self.endpoint.id());
}
for entry in self.networks.iter() {
let state = entry.value().state.read().unwrap();
if let Some(m) = state
.members
.all()
.iter()
.find(|m| m.identity.to_string().starts_with(short))
{
return Some(m.identity);
}
}
None
}
#[allow(clippy::result_large_err)]
fn coordinator_handle(
&self,
network: &str,
) -> std::result::Result<(EndpointId, Arc<tokio::sync::Mutex<()>>), IpcMessage> {
let Some(handle) = self.networks.get(network) else {
return Err(IpcMessage::Error {
message: format!("network '{network}' not active"),
});
};
if !handle.role.is_coordinator() {
return Err(IpcMessage::Error {
message: format!("only the coordinator of '{network}' can manage invites/requests"),
});
}
Ok((handle.network_key, handle.invite_lock.clone()))
}
async fn invite_create(
&self,
network: &str,
expires_secs: u64,
hostname: Option<String>,
reusable: bool,
) -> IpcMessage {
if reusable {
return self
.reusable_key_create(network, expires_secs, hostname)
.await;
}
let (net_pubkey, lock) = match self.coordinator_handle(network) {
Ok(v) => v,
Err(e) => return e,
};
let minted = {
let _guard = lock.lock().await;
match crate::invite::InviteStore::load(network) {
Ok(mut store) => store.mint(Duration::from_secs(expires_secs), hostname),
Err(e) => Err(e),
}
};
match minted {
Ok((secret, id)) => {
let code =
crate::invite::encode_invite_code(&net_pubkey, &self.endpoint.id(), &secret);
let secret_hash = crate::invite::hash_secret(&secret);
let expires = now_secs().saturating_add(expires_secs);
if let Some(handle) = self.networks.get(network) {
let members: Vec<crate::membership::Member> = handle
.state
.read()
.unwrap()
.members
.all()
.into_iter()
.cloned()
.collect();
let me = self.endpoint.id();
drop(handle);
gossip_to_coordinators(
&self.peers,
network,
&members,
me,
&ControlMsg::InviteShare {
id: id.clone(),
secret_hash: secret_hash.into_bytes(),
expires,
},
)
.await;
}
IpcMessage::InviteCreated {
code,
id,
expires_secs,
}
}
Err(e) => IpcMessage::Error {
message: format!("failed to mint invite: {e:#}"),
},
}
}
async fn reusable_key_create(
&self,
network: &str,
expires_secs: u64,
hostname: Option<String>,
) -> IpcMessage {
if hostname.is_some() {
return IpcMessage::Error {
message: "a reusable key cannot bind a hostname (a multi-use key admits many \
machines); drop --hostname or omit --reusable"
.to_string(),
};
}
let (state, dht_notify, net_pubkey, has_key) = match self.networks.get(network) {
Some(h) => {
let has_key = h.state.read().unwrap().network_secret_key.is_some();
(
h.state.clone(),
h.dht_notify.clone(),
h.network_key,
has_key,
)
}
None => {
return IpcMessage::Error {
message: format!("network '{network}' not active"),
};
}
};
if !has_key {
return IpcMessage::Error {
message: "only a coordinator (network key holder) can mint a reusable key"
.to_string(),
};
}
let secret = crate::invite::generate_secret();
let (hash, key) =
crate::membership::ReusableKey::from_secret(&secret, now_secs(), expires_secs);
let id = key.id.clone();
{
let mut s = state.write().unwrap();
s.reusable_keys.insert(hash, key);
}
update_snapshot_and_publish(&state, &self.blob_store, &dht_notify).await;
let code = crate::invite::encode_invite_code(&net_pubkey, &self.endpoint.id(), &secret);
IpcMessage::InviteCreated {
code,
id,
expires_secs,
}
}
async fn invite_list(&self, network: &str) -> IpcMessage {
let (lock, has_key, reusable) = {
let Some(handle) = self.networks.get(network) else {
return IpcMessage::Error {
message: format!("network '{network}' not active"),
};
};
let s = handle.state.read().unwrap();
(
handle.invite_lock.clone(),
s.network_secret_key.is_some(),
s.reusable_keys.clone(),
)
};
if !has_key {
return IpcMessage::Error {
message: format!(
"only a coordinator (network key holder) can list invites for '{network}'"
),
};
}
let mut invites: Vec<ipc::InviteInfo> = Vec::new();
{
let _guard = lock.lock().await;
if let Ok(store) = crate::invite::InviteStore::load(network) {
for v in store.list() {
invites.push(ipc::InviteInfo {
id: v.id,
status: v.status,
created: v.created,
expires: v.expires,
redeemer: v.redeemer,
hostname: v.hostname,
reusable: false,
});
}
}
}
let now = now_secs();
for k in reusable.values() {
let status = if k.revoked {
"revoked"
} else if now >= k.expires {
"expired"
} else {
"active"
};
invites.push(ipc::InviteInfo {
id: k.id.clone(),
status: status.to_string(),
created: k.created,
expires: k.expires,
redeemer: None,
hostname: None,
reusable: true,
});
}
IpcMessage::InviteListResponse { invites }
}
async fn invite_revoke(&self, network: &str, id: &str) -> IpcMessage {
let (state, dht_notify, lock, has_key) = {
let Some(handle) = self.networks.get(network) else {
return IpcMessage::Error {
message: format!("network '{network}' not active"),
};
};
let has_key = handle.state.read().unwrap().network_secret_key.is_some();
(
handle.state.clone(),
handle.dht_notify.clone(),
handle.invite_lock.clone(),
has_key,
)
};
if !has_key {
return IpcMessage::Error {
message: format!(
"only a coordinator (network key holder) can revoke invites for '{network}'"
),
};
}
let revoked_reusable = {
let mut s = state.write().unwrap();
crate::membership::revoke_reusable(&mut s.reusable_keys, id).is_ok()
};
if revoked_reusable {
update_snapshot_and_publish(&state, &self.blob_store, &dht_notify).await;
return IpcMessage::Ok {
message: format!("revoked reusable key '{id}' (propagating to all admins)"),
};
}
let result = {
let _guard = lock.lock().await;
match crate::invite::InviteStore::load(network) {
Ok(mut store) => store.revoke(id),
Err(e) => Err(e),
}
};
match result {
Ok(()) => IpcMessage::Ok {
message: format!("revoked invite '{id}'"),
},
Err(e) => IpcMessage::Error {
message: format!("{e:#}"),
},
}
}
fn list_requests(&self, network: &str) -> IpcMessage {
let Some(handle) = self.networks.get(network) else {
return IpcMessage::Error {
message: format!("network '{network}' not active"),
};
};
if !handle.role.is_coordinator() {
return IpcMessage::Error {
message: format!("only the coordinator of '{network}' has join requests"),
};
}
let s = handle.state.read().unwrap();
let requests = s
.pending
.iter()
.map(|(id, pj)| ipc::PendingRequestInfo {
short_id: id.fmt_short().to_string(),
hostname: pj.hostname.clone(),
waiting_secs: pj.requested_at.elapsed().as_secs(),
})
.collect();
IpcMessage::PendingRequests { requests }
}
async fn accept_request(&self, network: &str, id_prefix: &str) -> IpcMessage {
if let Err(e) = self.coordinator_handle(network) {
return e;
}
let pending = {
let Some(handle) = self.networks.get(network) else {
return IpcMessage::Error {
message: format!("network '{network}' not active"),
};
};
let mut s = handle.state.write().unwrap();
let found = s
.pending
.keys()
.find(|k| {
k.fmt_short().to_string().starts_with(id_prefix)
|| k.to_string().starts_with(id_prefix)
})
.copied();
found.and_then(|id| s.pending.remove(&id).map(|pj| (id, pj)))
};
let Some((identity, pj)) = pending else {
return IpcMessage::Error {
message: format!("no pending request matching '{id_prefix}'"),
};
};
let user_id = pj.device_cert.as_ref().map(|c| c.user_identity);
let ip = {
let Some(handle) = self.networks.get(network) else {
return IpcMessage::Error {
message: format!("network '{network}' not active"),
};
};
let mut s = handle.state.write().unwrap();
let (ip, collision_index) = crate::membership::assign_ip(&s.members, &identity);
let members = s.members.clone();
let _ = s.approved.approve(
ApprovedEntry {
identity,
ip,
hostname: pj.hostname.clone(),
user_identity: user_id,
device_cert: pj.device_cert.clone(),
collision_index,
},
&members,
);
s.refresh_snapshot();
ip
};
self.store_and_publish_group(network).await;
broadcast_control_msg(
&self.peers,
&ControlMsg::MemberApproved {
identity,
ip,
hostname: pj.hostname.clone(),
device_cert: pj.device_cert.clone(),
},
)
.await;
IpcMessage::Ok {
message: format!("accepted {} — they'll join shortly", identity.fmt_short()),
}
}
fn deny_request(&self, network: &str, id_prefix: &str) -> IpcMessage {
if let Err(e) = self.coordinator_handle(network) {
return e;
}
let Some(handle) = self.networks.get(network) else {
return IpcMessage::Error {
message: format!("network '{network}' not active"),
};
};
let mut s = handle.state.write().unwrap();
let found = s
.pending
.keys()
.find(|k| {
k.fmt_short().to_string().starts_with(id_prefix)
|| k.to_string().starts_with(id_prefix)
})
.copied();
match found {
Some(id) => {
s.pending.remove(&id);
IpcMessage::Ok {
message: format!("denied {}", id.fmt_short()),
}
}
None => IpcMessage::Error {
message: format!("no pending request matching '{id_prefix}'"),
},
}
}
async fn admin_add(&self, network: &str, identity_str: &str) -> IpcMessage {
let Some(identity) = self.resolve_short_id_any_network(identity_str) else {
return IpcMessage::Error {
message: format!(
"could not resolve identity '{identity_str}' (use a short id of a joined member)"
),
};
};
let (net_pubkey, net_secret_key) = match self.networks.get(network) {
Some(h) => {
let key = {
let s = h.state.read().unwrap();
s.network_secret_key.clone()
};
if key.is_none() {
return IpcMessage::Error {
message: "only a coordinator (network key holder) can grant admin"
.to_string(),
};
}
(h.network_key, key)
}
None => {
return IpcMessage::Error {
message: format!("network '{network}' not active"),
};
}
};
let Some(net_secret_key) = net_secret_key else {
return IpcMessage::Error {
message: "network key not available".to_string(),
};
};
let conn = self
.peers
.peers_for_network_with_conn(network)
.into_iter()
.find(|(id, _, _)| *id == identity)
.map(|(_, _, c)| c)
.ok_or_else(|| IpcMessage::Error {
message: format!(
"could not find an active connection to {identity} on '{network}'"
),
});
let conn = match conn {
Ok(c) => c,
Err(e) => return e,
};
let grant = ControlMsg::AdminGrant {
network_pubkey: net_pubkey,
secret_key: net_secret_key.to_bytes(),
};
match conn.open_bi().await {
Ok((mut send, _)) => match control::send_msg(&mut send, &grant).await {
Ok(()) => {
let _ = tokio::time::timeout(Duration::from_secs(5), conn.closed()).await;
}
Err(e) => {
return IpcMessage::Error {
message: format!("failed to send admin grant: {e}"),
};
}
},
Err(e) => {
return IpcMessage::Error {
message: format!("failed to open stream to {identity}: {e}"),
};
}
}
{
let Some(handle) = self.networks.get(network) else {
return IpcMessage::Error {
message: format!("network '{network}' not active"),
};
};
let mut s = handle.state.write().unwrap();
crate::membership::mark_coordinator(&mut s.members, &identity);
s.refresh_snapshot();
}
self.store_and_publish_group(network).await;
if let Ok(Some(mut net)) = config::load_network(network)
&& !net.admins.contains(&identity)
{
net.admins.push(identity);
let _ = config::save_network(&net);
}
IpcMessage::Ok {
message: format!("granted network key to {}", identity.fmt_short()),
}
}
fn admin_list(&self, network: &str) -> IpcMessage {
let self_id = self.endpoint.id();
let mut admins = Vec::new();
let self_holds_key = match self.networks.get(network) {
Some(h) => h.state.read().unwrap().network_secret_key.is_some(),
None => false,
};
if self_holds_key {
admins.push(ipc::AdminInfo {
short_id: self_id.fmt_short().to_string(),
self_node: true,
});
}
if let Ok(cfg) = config::load()
&& let Some(net) = cfg.networks.iter().find(|n| n.name == network)
{
for id in &net.admins {
admins.push(ipc::AdminInfo {
short_id: id.fmt_short().to_string(),
self_node: false,
});
}
}
if !self_holds_key && admins.is_empty() {
return IpcMessage::Error {
message: format!("network '{network}' not found or not a coordinator"),
};
}
IpcMessage::AdminListResponse { admins }
}
fn existing_direct_network_with(&self, peer: &EndpointId) -> Option<String> {
let direct: HashSet<String> = config::load()
.map(|c| {
c.networks
.iter()
.filter(|n| n.direct)
.map(|n| n.name.clone())
.collect()
})
.unwrap_or_default();
self.networks.iter().find_map(|h| {
if !direct.contains(h.key()) {
return None;
}
let s = h.state.read().ok()?;
let has = s.members.all().iter().any(|m| &m.identity == peer)
|| s.approved.all().iter().any(|a| &a.identity == peer);
has.then(|| h.key().clone())
})
}
async fn send_connect_request(
&self,
peer: EndpointId,
hostname: Option<String>,
) -> Result<control::ConnectMsg> {
let conn =
transport::connect_to_peer_with_alpn(&self.endpoint, peer, transport::CONNECT_ALPN)
.await?;
let (mut send, mut recv) = conn.open_bi().await?;
control::send_framed(
&mut send,
&control::ConnectMsg::Request {
from_contact_id: self.contact_public,
from_endpoint: self.endpoint.id(),
hostname,
},
)
.await?;
control::recv_framed::<control::ConnectMsg>(&mut recv).await
}
async fn join_direct(
self: &Arc<Self>,
room_id: EndpointId,
coordinator: EndpointId,
hostname: Option<String>,
) -> IpcMessage {
let resp = self
.join_network(
&room_id.to_string(),
None,
hostname,
None,
Some(coordinator),
false,
)
.await;
if let IpcMessage::Joined { name, .. } = &resp
&& let Ok(Some(mut n)) = config::load_network(name)
{
n.direct = true;
let _ = config::save_network(&n);
}
resp
}
fn spawn_connect_retry(self: &Arc<Self>, peer: EndpointId, hostname: Option<String>) {
let me = Arc::clone(self);
tokio::spawn(async move {
let mut backoff = BACKOFF_INITIAL;
loop {
tokio::select! {
_ = me.shutdown_token.cancelled() => return,
_ = tokio::time::sleep(backoff) => {}
}
backoff = (backoff * 2).min(BACKOFF_MAX);
match me.send_connect_request(peer, hostname.clone()).await {
Ok(control::ConnectMsg::Approved {
room_id,
coordinator,
}) => {
let _ = me.join_direct(room_id, coordinator, hostname.clone()).await;
me.protocol_router.outgoing_connects.remove(&peer);
return;
}
Ok(control::ConnectMsg::Denied { reason }) => {
tracing::warn!(reason, peer = %peer.fmt_short(), "connect request denied");
me.protocol_router.outgoing_connects.remove(&peer);
return;
}
_ => {} }
}
});
}
async fn connect(self: &Arc<Self>, contact_id: &str, hostname: Option<String>) -> IpcMessage {
let contact_pubkey = match contact_id.parse::<EndpointId>() {
Ok(id) => id,
Err(e) => {
return IpcMessage::Error {
message: format!("invalid contact id: {e}"),
};
}
};
if contact_pubkey == self.contact_public {
return IpcMessage::Error {
message: "cannot connect to your own contact id".to_string(),
};
}
let pkarr = match dht::create_pkarr_client(&self.endpoint) {
Ok(c) => c,
Err(e) => {
return IpcMessage::Error {
message: format!("failed to create pkarr client: {e}"),
};
}
};
let peer = match dht::resolve_contact(&pkarr, contact_pubkey).await {
Ok(id) => id,
Err(_) => {
return IpcMessage::Error {
message: "contact offline or unknown (could not resolve contact id)"
.to_string(),
};
}
};
if let Some(name) = self.existing_direct_network_with(&peer) {
return IpcMessage::Ok {
message: format!("already connected to this peer on '{name}'"),
};
}
self.protocol_router.outgoing_connects.insert(peer);
match self.send_connect_request(peer, hostname.clone()).await {
Ok(control::ConnectMsg::Approved {
room_id,
coordinator,
}) => {
self.protocol_router.outgoing_connects.remove(&peer);
self.join_direct(room_id, coordinator, hostname).await
}
Ok(control::ConnectMsg::Pending) => {
self.spawn_connect_retry(peer, hostname);
IpcMessage::Ok {
message: "connect request sent — waiting for approval".to_string(),
}
}
Ok(control::ConnectMsg::Denied { reason }) => {
self.protocol_router.outgoing_connects.remove(&peer);
IpcMessage::Error {
message: format!("connection denied: {reason}"),
}
}
Ok(_) => IpcMessage::Error {
message: "unexpected response from contact".to_string(),
},
Err(e) => {
self.protocol_router.outgoing_connects.remove(&peer);
IpcMessage::Error {
message: format!("failed to reach contact: {e}"),
}
}
}
}
fn list_connections(&self) -> IpcMessage {
let now = Instant::now();
let requests = self
.protocol_router
.pending_connects
.iter()
.map(|p| ipc::PendingRequestInfo {
short_id: p.from_contact_id.fmt_short().to_string(),
hostname: p.hostname.clone(),
waiting_secs: now.saturating_duration_since(p.requested_at).as_secs(),
})
.collect();
IpcMessage::PendingRequests { requests }
}
fn direct_network_name(&self, my_host: &str, peer_hostname: Option<&str>) -> String {
let peer = peer_hostname.unwrap_or("peer");
let mut base = format!("{my_host}-{peer}");
if base.len() > 63 {
base.truncate(63);
base = base.trim_end_matches('-').to_string();
}
if !crate::hostname::is_valid_hostname(&base) {
base = crate::network_name::generate_name();
}
let taken: Vec<String> = self.networks.iter().map(|h| h.key().clone()).collect();
let taken_refs: Vec<&str> = taken.iter().map(|s| s.as_str()).collect();
crate::hostname::resolve_collision(&base, &taken_refs)
}
async fn approve_connection(self: &Arc<Self>, id_prefix: &str) -> IpcMessage {
let found = self
.protocol_router
.pending_connects
.iter()
.find(|p| {
p.from_contact_id
.fmt_short()
.to_string()
.starts_with(id_prefix)
|| p.from_contact_id.to_string().starts_with(id_prefix)
})
.map(|p| p.value().clone());
let Some(req) = found else {
return IpcMessage::Error {
message: format!("no pending connection request matching '{id_prefix}'"),
};
};
let peer = req.from_endpoint;
if let Some(name) = self.existing_direct_network_with(&peer) {
self.protocol_router.pending_connects.remove(&peer);
return IpcMessage::Ok {
message: format!("already connected to this peer on '{name}'"),
};
}
let we_initiated = self.protocol_router.outgoing_connects.contains(&peer);
if we_initiated && self.endpoint.id().to_string() < peer.to_string() {
self.protocol_router.pending_connects.remove(&peer);
return IpcMessage::Ok {
message: "connection will be established by the other peer".to_string(),
};
}
let my_host = config::load()
.ok()
.and_then(|c| c.default_hostname)
.unwrap_or_else(crate::hostname::generate_hostname);
let name = self.direct_network_name(&my_host, req.hostname.as_deref());
match self
.create_network_inner(
GroupMode::Restricted,
Some(name),
Some(my_host),
true,
Some((peer, req.hostname.clone())),
)
.await
{
Ok(IpcMessage::Created {
name, network_key, ..
}) => {
self.protocol_router.pending_connects.remove(&peer);
self.protocol_router
.approved_connects
.insert(peer, (network_key, self.endpoint.id()));
IpcMessage::Ok {
message: format!("approved — direct connection '{name}' created"),
}
}
Ok(other) => other,
Err(e) => IpcMessage::Error {
message: format!("failed to create direct network: {e:#}"),
},
}
}
async fn rotate_contact(&self) -> IpcMessage {
let mut cfg = match config::load() {
Ok(c) => c,
Err(e) => {
return IpcMessage::Error {
message: format!("failed to load config: {e}"),
};
}
};
let secret = config::rotate_contact_secret(&mut cfg);
if let Err(e) = config::save_settings(&cfg) {
return IpcMessage::Error {
message: format!("failed to save config: {e}"),
};
}
if self.active.load(Ordering::SeqCst)
&& let Ok(client) = dht::create_pkarr_client(&self.endpoint)
{
let _ = dht::publish_contact(&client, &secret, self.endpoint.id()).await;
}
IpcMessage::ContactIdResponse {
contact_id: secret.public().to_string(),
}
}
async fn store_and_publish_group(&self, network: &str) {
let (hash, net_key, snap_bytes) = {
let Some(handle) = self.networks.get(network) else {
return;
};
let s = handle.state.read().unwrap();
(
s.snapshot.as_ref().map(|x| x.hash),
s.network_secret_key.clone(),
s.snapshot.as_ref().map(|x| x.msgpack_bytes.clone()),
)
};
if let Some(bytes) = snap_bytes {
let _ = self.blob_store.blobs().add_slice(&bytes).await;
}
if let (Some(hash), Some(key)) = (hash, net_key)
&& let Ok(client) = dht::create_pkarr_client(&self.endpoint)
{
let mut seed_peers: Vec<EndpointId> = self
.peers
.peers_for_network(network)
.into_iter()
.map(|(id, _)| id)
.collect();
seed_peers.push(self.endpoint.id());
seed_peers.sort_by_key(|id| id.to_string());
seed_peers.dedup();
if let Err(e) = dht::publish_network(&client, &key, &hash, &seed_peers).await {
tracing::warn!(error = %e, "failed to publish network record after accept");
}
}
}
fn firewall_add(
&self,
direction: firewall::Direction,
action: firewall::Action,
protocol: firewall::Protocol,
port: Option<&str>,
peer: Option<&str>,
network: Option<&str>,
) -> IpcMessage {
let ports: Vec<Option<firewall::PortRange>> = match port {
Some(s) => match firewall::parse_port_list(s) {
Ok(ranges) => ranges.into_iter().map(Some).collect(),
Err(e) => {
return IpcMessage::Error {
message: e.to_string(),
};
}
},
None => vec![None],
};
let peer = match peer {
Some(s) => match self.resolve_short_id_any_network(s) {
Some(id) => firewall::PeerFilter::Identity(id),
None => {
return IpcMessage::Error {
message: format!("unknown peer '{s}'"),
};
}
},
None => firewall::PeerFilter::Any,
};
let unknown_network = network.filter(|net| !self.networks.contains_key(*net));
if let Some(net) = unknown_network {
tracing::warn!(network = %net, "firewall rule scoped to a network this node is not on");
}
let mut config = (*self.firewall.get_config()).clone();
for port in ports.iter().cloned() {
let rule = firewall::FirewallRule {
direction,
action,
protocol,
port,
peer: peer.clone(),
network: network.map(str::to_string),
origin: firewall::RuleOrigin::Local,
};
config.rules.retain(|r| !firewall::same_selector(r, &rule));
config.rules.insert(0, rule);
}
self.firewall.update(config.clone());
if let Err(e) = firewall::save_firewall(&config) {
tracing::warn!(error = %e, "failed to persist firewall config");
}
let count = ports.len();
let plural = if count == 1 { "rule" } else { "rules" };
let message = match unknown_network {
Some(net) => {
format!("{count} {plural} added (note: not currently on network '{net}')")
}
None => format!("{count} {plural} added"),
};
IpcMessage::Ok { message }
}
fn firewall_remove(&self, index: usize) -> IpcMessage {
let current = self.firewall.get_config();
if index >= current.rules.len() {
return IpcMessage::Error {
message: format!(
"index {index} out of range (have {} rules)",
current.rules.len()
),
};
}
let mut config = (*current).clone();
config.rules.remove(index);
self.firewall.update(config.clone());
if let Err(e) = firewall::save_firewall(&config) {
tracing::warn!(error = %e, "failed to persist firewall config");
}
IpcMessage::Ok {
message: "rule removed".to_string(),
}
}
fn firewall_show(&self) -> IpcMessage {
let config = self.firewall.get_config();
let short_id = |id: &EndpointId| -> String { id.fmt_short().to_string() };
IpcMessage::FirewallState {
default_inbound: config.default_inbound,
default_outbound: config.default_outbound,
reject: config.reject,
rules: firewall::rule_views(&config.rules, &short_id),
}
}
async fn firewall_suggest(&self, network: &str, suggestions: SuggestedFirewall) -> IpcMessage {
let (state, dht_notify, has_key) = match self.networks.get(network) {
Some(h) => {
let has_key = h.state.read().unwrap().network_secret_key.is_some();
(h.state.clone(), h.dht_notify.clone(), has_key)
}
None => {
return IpcMessage::Error {
message: format!("network '{network}' not found"),
};
}
};
if !has_key {
return IpcMessage::Error {
message: "only a coordinator (network key holder) can suggest firewall rules"
.to_string(),
};
}
let count: usize = suggestions.len();
{
let mut s = state.write().unwrap();
s.suggested_firewall = suggestions;
}
update_snapshot_and_publish(&state, &self.blob_store, &dht_notify).await;
broadcast_member_sync(&self.peers, None).await;
apply_suggested_firewall(&self.firewall, self.endpoint.id(), network, &state);
IpcMessage::Ok {
message: format!("published firewall suggestions for '{network}' ({count} subjects)"),
}
}
fn firewall_suggestions(&self, network: &str) -> IpcMessage {
match self.networks.get(network) {
Some(h) => {
let suggestions = h.state.read().unwrap().suggested_firewall.clone();
IpcMessage::FirewallSuggestionsResponse { suggestions }
}
None => IpcMessage::Error {
message: format!("network '{network}' not found"),
},
}
}
fn firewall_pending(&self, network: &str) -> IpcMessage {
match self.networks.get(network) {
Some(h) => {
let pending = h.state.read().unwrap().pending_suggestions.clone();
let short_id = |id: &EndpointId| -> String { id.fmt_short().to_string() };
IpcMessage::FirewallPendingResponse {
network: network.to_string(),
rules: firewall::rule_views(&pending, &short_id),
}
}
None => IpcMessage::Error {
message: format!("network '{network}' not found"),
},
}
}
fn firewall_resolve_suggestions(
&self,
network: &str,
accept: &[FirewallRuleView],
deny: &[FirewallRuleView],
) -> IpcMessage {
let short_id = |id: &EndpointId| -> String { id.fmt_short().to_string() };
let h = match self.networks.get(network) {
Some(h) => h,
None => {
return IpcMessage::Error {
message: format!("network '{network}' not found"),
};
}
};
let accept_set: std::collections::HashSet<&FirewallRuleView> = accept.iter().collect();
let deny_set: std::collections::HashSet<&FirewallRuleView> = deny.iter().collect();
let mut accepted_rules = Vec::new();
{
let mut s = h.state.write().unwrap();
let mut remaining = Vec::new();
for rule in std::mem::take(&mut s.pending_suggestions) {
let view = firewall::rule_view(&rule, &short_id);
if accept_set.contains(&view) {
accepted_rules.push(rule);
} else if deny_set.contains(&view) {
} else {
remaining.push(rule);
}
}
s.pending_suggestions = remaining;
}
let n_accept = accepted_rules.len();
let n_deny = deny.len();
if !accepted_rules.is_empty() {
let mut existing: Vec<firewall::FirewallRule> = self
.firewall
.get_config()
.rules
.iter()
.filter(|r| matches!(&r.origin, firewall::RuleOrigin::Network(n) if n == network))
.cloned()
.collect();
existing.extend(accepted_rules);
let config = self.firewall.replace_network_rules(network, existing);
if let Err(e) = firewall::save_firewall(&config) {
tracing::warn!(error = %e, "failed to persist firewall config");
}
}
IpcMessage::Ok {
message: format!(
"accepted {n_accept}, denied {n_deny} suggested rules for '{network}'"
),
}
}
fn firewall_accept(&self, network: &str) -> IpcMessage {
let rules = match self.networks.get(network) {
Some(h) => {
let mut s = h.state.write().unwrap();
std::mem::take(&mut s.pending_suggestions)
}
None => {
return IpcMessage::Error {
message: format!("network '{network}' not found"),
};
}
};
if rules.is_empty() {
return IpcMessage::Error {
message: format!("no pending suggested rules for '{network}'"),
};
}
let count = rules.len();
let config = self.firewall.replace_network_rules(network, rules);
if let Err(e) = firewall::save_firewall(&config) {
tracing::warn!(error = %e, "failed to persist firewall config");
}
IpcMessage::Ok {
message: format!("accepted {count} suggested rules from '{network}'"),
}
}
fn firewall_deny(&self, network: &str) -> IpcMessage {
match self.networks.get(network) {
Some(h) => {
let mut s = h.state.write().unwrap();
let count = s.pending_suggestions.len();
s.pending_suggestions.clear();
IpcMessage::Ok {
message: format!("discarded {count} pending suggested rules for '{network}'"),
}
}
None => IpcMessage::Error {
message: format!("network '{network}' not found"),
},
}
}
fn firewall_auto_accept(&self, network: &str, enabled: bool) -> IpcMessage {
if !self.networks.contains_key(network) {
return IpcMessage::Error {
message: format!("network '{network}' not found"),
};
}
match config::load_network(network) {
Ok(Some(mut nc)) => {
nc.auto_accept_firewall = enabled;
if let Err(e) = config::save_network(&nc) {
return IpcMessage::Error {
message: format!("failed to persist auto-accept setting: {e}"),
};
}
}
Ok(None) => {
return IpcMessage::Error {
message: format!("network '{network}' not found in config"),
};
}
Err(e) => {
return IpcMessage::Error {
message: format!("failed to load config: {e}"),
};
}
}
if let Some(h) = self.networks.get(network) {
apply_suggested_firewall(&self.firewall, self.endpoint.id(), network, &h.state);
}
IpcMessage::Ok {
message: format!(
"auto-accept firewall suggestions {} for '{network}'",
if enabled { "enabled" } else { "disabled" }
),
}
}
fn firewall_default(&self, action: firewall::Action) -> IpcMessage {
let mut config = (*self.firewall.get_config()).clone();
config.default_inbound = action;
self.firewall.update(config.clone());
if let Err(e) = firewall::save_firewall(&config) {
tracing::warn!(error = %e, "failed to persist firewall config");
}
IpcMessage::Ok {
message: format!("inbound default set to {action}"),
}
}
fn firewall_reject(&self, enabled: bool) -> IpcMessage {
let mut config = (*self.firewall.get_config()).clone();
config.reject = enabled;
self.firewall.update(config.clone());
if let Err(e) = firewall::save_firewall(&config) {
tracing::warn!(error = %e, "failed to persist firewall config");
}
IpcMessage::Ok {
message: format!(
"fail-fast reject {}",
if enabled { "on" } else { "off" }
),
}
}
async fn resolve_peer_name(&self, name: &str) -> Option<EndpointId> {
let suffix = format!(".{}", crate::DNS_DOMAIN);
let qualified = if name.ends_with(&suffix) {
name.to_string()
} else {
format!("{name}{suffix}")
};
if let Some((ip, _)) = dns::resolve_name(&qualified, &suffix, &self.hostname_table).await {
if let Some(route) = self.peers.lookup_v4(&ip) {
return Some(route.endpoint_id);
}
for entry in self.networks.iter() {
let state = entry.value().state.read().unwrap();
if let Some(m) = state.members.all().iter().find(|m| m.ip == ip) {
return Some(m.identity);
}
}
}
self.resolve_short_id_any_network(name)
}
async fn send_file(&self, path: &str, peer: &str) -> IpcMessage {
let peer_id = match self.resolve_peer_name(peer).await {
Some(id) => id,
None => {
return IpcMessage::Error {
message: format!("unknown peer '{peer}'"),
};
}
};
let file_path = std::path::Path::new(path);
let file_bytes = match std::fs::read(file_path) {
Ok(b) => b,
Err(e) => {
return IpcMessage::Error {
message: format!("cannot read '{}': {e}", file_path.display()),
};
}
};
let filename = file_path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "file".to_string());
let size = file_bytes.len() as u64;
let mime_type = guess_mime_type(&filename);
let hash = blake3::hash(&file_bytes);
if let Err(e) = self.blob_store.blobs().add_slice(&file_bytes).await {
return IpcMessage::Error {
message: format!("blob store error: {e}"),
};
}
let msg = control::ControlMsg::FileOffer {
from: self.endpoint.id(),
filename: filename.clone(),
size,
mime_type: mime_type.clone(),
blob_hash: hash,
};
match transport::connect_to_peer_with_alpn(&self.endpoint, peer_id, transport::FILES_ALPN)
.await
{
Ok(conn) => match conn.open_bi().await {
Ok((mut send, _)) => {
if let Err(e) = control::send_msg(&mut send, &msg).await {
return IpcMessage::Error {
message: format!("failed to send offer: {e}"),
};
}
let _ = tokio::time::timeout(Duration::from_secs(5), conn.closed()).await;
}
Err(e) => {
return IpcMessage::Error {
message: format!("failed to open stream: {e}"),
};
}
},
Err(e) => {
return IpcMessage::Error {
message: format!("cannot reach peer '{peer}': {e}"),
};
}
}
IpcMessage::Ok {
message: format!("offered {} ({}) to {}", filename, format_size(size), peer),
}
}
fn list_files(&self) -> IpcMessage {
let pending = self.protocol_router.pending_files.lock().unwrap();
let files = pending
.iter()
.map(|f| ipc::PendingFileInfo {
id: f.id,
from: f.from.fmt_short().to_string(),
filename: f.filename.clone(),
size: f.size,
mime_type: f.mime_type.clone(),
})
.collect();
IpcMessage::FileList { files }
}
async fn accept_file(
&self,
id: u64,
output: Option<String>,
peer_cred: Option<(u32, u32)>,
) -> IpcMessage {
let pending_file = {
let mut pending = self.protocol_router.pending_files.lock().unwrap();
let idx = pending.iter().position(|f| f.id == id);
match idx {
Some(i) => pending.remove(i),
None => {
return IpcMessage::Error {
message: format!("no pending file with id {id}"),
};
}
}
};
let blob_hash = iroh_blobs::Hash::from_bytes(*pending_file.blob_hash.as_bytes());
let conn = match transport::connect_to_peer_with_alpn(
&self.endpoint,
pending_file.from,
iroh_blobs::protocol::ALPN,
)
.await
{
Ok(c) => c,
Err(e) => {
return IpcMessage::Error {
message: format!("cannot reach sender: {e}"),
};
}
};
if let Err(e) = self
.blob_store
.remote()
.fetch(conn, iroh_blobs::HashAndFormat::raw(blob_hash))
.await
{
return IpcMessage::Error {
message: format!("blob fetch failed: {e}"),
};
}
let bytes = match self.blob_store.blobs().get_bytes(blob_hash).await {
Ok(b) => b,
Err(e) => {
return IpcMessage::Error {
message: format!("blob read failed: {e}"),
};
}
};
let dir = match output {
Some(ref p) => PathBuf::from(p),
None => dirs::download_dir().unwrap_or_else(|| {
dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join("Downloads")
}),
};
if let Err(e) = std::fs::create_dir_all(&dir) {
return IpcMessage::Error {
message: format!("cannot create directory '{}': {e}", dir.display()),
};
}
let dest = dir.join(&pending_file.filename);
if let Err(e) = std::fs::write(&dest, &bytes) {
return IpcMessage::Error {
message: format!("write failed: {e}"),
};
}
if let Some((uid, gid)) = peer_cred {
use std::os::unix::ffi::OsStrExt;
if let Ok(c) = std::ffi::CString::new(dest.as_os_str().as_bytes()) {
unsafe { libc::chown(c.as_ptr(), uid, gid) };
}
if let Ok(c) = std::ffi::CString::new(dir.as_os_str().as_bytes()) {
unsafe { libc::chown(c.as_ptr(), uid, gid) };
}
}
IpcMessage::Ok {
message: format!("saved to {}", dest.display()),
}
}
fn start_pairing(&self) -> IpcMessage {
let secret: [u8; 32] = rand::random();
let endpoint_id = self.endpoint.id();
let mut ticket_bytes = Vec::with_capacity(64);
ticket_bytes.extend_from_slice(endpoint_id.as_bytes());
ticket_bytes.extend_from_slice(&secret);
let ticket = bs58::encode(&ticket_bytes).into_string();
*self.pairing_secret.lock().unwrap() = Some(secret);
IpcMessage::PairingTicket { ticket }
}
async fn pair_with_device(&self, endpoint_id: EndpointId, secret: Vec<u8>) -> IpcMessage {
let addr: iroh::EndpointAddr = endpoint_id.into();
let conn = match self.endpoint.connect(addr, PAIR_ALPN).await {
Ok(c) => c,
Err(e) => {
return IpcMessage::Error {
message: format!("failed to connect to primary device: {e}"),
};
}
};
let (mut send, mut recv) = match conn.open_bi().await {
Ok(pair) => pair,
Err(e) => {
return IpcMessage::Error {
message: format!("failed to open stream: {e}"),
};
}
};
let secret_arr: [u8; 32] = match secret.try_into() {
Ok(a) => a,
Err(_) => {
return IpcMessage::Error {
message: "invalid secret length".to_string(),
};
}
};
let request = control::PairMsg::Request {
secret: secret_arr,
device_pubkey: self.endpoint.id(),
};
let request_bytes = match rmp_serde::to_vec_named(&request) {
Ok(b) => b,
Err(e) => {
return IpcMessage::Error {
message: format!("failed to encode pair request: {e}"),
};
}
};
let len = (request_bytes.len() as u32).to_be_bytes();
if let Err(e) = send.write_all(&len).await {
return IpcMessage::Error {
message: format!("failed to send pair request: {e}"),
};
}
if let Err(e) = send.write_all(&request_bytes).await {
return IpcMessage::Error {
message: format!("failed to send pair request: {e}"),
};
}
let mut len_buf = [0u8; 4];
if let Err(e) = recv.read_exact(&mut len_buf).await {
return IpcMessage::Error {
message: format!("failed to read pair response: {e}"),
};
}
let body_len = u32::from_be_bytes(len_buf) as usize;
let mut body = vec![0u8; body_len];
if let Err(e) = recv.read_exact(&mut body).await {
return IpcMessage::Error {
message: format!("failed to read pair response body: {e}"),
};
}
let response: control::PairMsg = match rmp_serde::from_slice(&body) {
Ok(r) => r,
Err(e) => {
return IpcMessage::Error {
message: format!("failed to decode pair response: {e}"),
};
}
};
match response {
control::PairMsg::Response { cert } => {
if !cert.verify() {
return IpcMessage::Error {
message: "received invalid device certificate".to_string(),
};
}
if let Err(e) = identity::store_device_cert(&cert) {
return IpcMessage::Error {
message: format!("failed to store device certificate: {e}"),
};
}
IpcMessage::PairingComplete {
user_identity: cert.user_identity,
}
}
_ => IpcMessage::Error {
message: "unexpected pairing response".to_string(),
},
}
}
}
fn guess_mime_type(filename: &str) -> String {
mime_guess::from_path(filename)
.first_or_octet_stream()
.to_string()
}
fn format_size(bytes: u64) -> String {
humansize::format_size(bytes, humansize::BINARY)
}
fn collect_recent_logs() -> Vec<(String, Vec<u8>)> {
const MAX_TOTAL: u64 = 3 * 1024 * 1024;
let dir = crate::logdir::log_dir();
let mut entries: Vec<std::path::PathBuf> = match std::fs::read_dir(&dir) {
Ok(rd) => rd
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
p.file_name()
.and_then(|n| n.to_str())
.is_some_and(|n| n.starts_with("rayfish.log") || n == "panic.log")
})
.collect(),
Err(_) => return Vec::new(),
};
entries.sort();
entries.reverse();
let mut out = Vec::new();
let mut total = 0u64;
for path in entries {
let Ok(bytes) = std::fs::read(&path) else {
continue;
};
total += bytes.len() as u64;
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
out.push((format!("logs/{name}"), bytes));
}
if total >= MAX_TOTAL {
break;
}
}
out
}
fn write_bundle(path: &std::path::Path, files: &[(String, Vec<u8>)]) -> std::io::Result<()> {
let file = std::fs::File::create(path)?;
let enc = flate2::write::GzEncoder::new(file, flate2::Compression::default());
let mut builder = tar::Builder::new(enc);
for (name, data) in files {
let mut header = tar::Header::new_gnu();
header.set_size(data.len() as u64);
header.set_mode(0o644);
builder.append_data(&mut header, name, data.as_slice())?;
}
builder.into_inner()?.finish()?;
Ok(())
}
pub async fn run_daemon(token: CancellationToken, stats: Arc<ForwardMetrics>) -> Result<()> {
check_cgnat_conflict()?;
let (daemon, _metrics_server, promote_rx) = build_daemon(token.clone(), stats).await?;
daemon.connect_all_networks().await;
daemon.activate(None).await;
let result = serve_ipc(&daemon, promote_rx, token).await;
daemon.endpoint.close().await;
result
}
async fn build_daemon(
token: CancellationToken,
stats: Arc<ForwardMetrics>,
) -> Result<(
Arc<DaemonState>,
Option<iroh_metrics::service::MetricsServer>,
mpsc::Receiver<String>,
)> {
config::migrate_location();
let key = identity::load_or_create()?;
let public_key = key.public();
let device_cert = identity::load_device_cert()?;
if let Some(ref cert) = device_cert {
tracing::info!(user = %cert.user_identity.fmt_short(), "loaded device certificate");
}
let collision_index = identity::load_collision_index()?;
let identity = IrohIdentityProvider::new(public_key, collision_index);
let my_ip = identity.local_ip();
let mut app_config = config::load()?;
dht::set_discovery_override(&app_config.discovery_dns);
let contact_public = config::contact_secret(&mut app_config).public();
if let Err(e) = config::save_settings(&app_config) {
tracing::warn!(error = %e, "failed to persist contact key");
}
let mut alpns: Vec<Vec<u8>> = app_config
.networks
.iter()
.filter_map(|net| net.network_public_key.as_ref().map(transport::network_alpn))
.collect();
alpns.push(iroh_blobs::protocol::ALPN.to_vec());
alpns.push(transport::FILES_ALPN.to_vec());
alpns.push(PAIR_ALPN.to_vec());
alpns.push(transport::CONNECT_ALPN.to_vec());
let use_tor = app_config
.networks
.iter()
.any(|net| net.transport.as_ref().is_some_and(|t| t.is_tor()));
let ep = transport::create_endpoint_with_alpns(
key.clone(),
alpns,
use_tor,
&app_config.relay,
&app_config.discovery_dns,
)
.await?;
let blobs_dir = config::config_dir()?.join("blobs");
std::fs::create_dir_all(&blobs_dir)?;
let blob_store = FsStore::load(&blobs_dir)
.await
.context("failed to open blob store")?;
let blobs_proto = BlobsProtocol::new(&blob_store, None);
let my_ipv6 = derive_ipv6(&identity.local_identity());
let (tun_reader, tun_writer, tun_name) = tun::create(my_ip, my_ipv6)
.await
.context("failed to create TUN device")?;
let peers = match audit::AuditLog::open() {
Ok(log) => PeerTable::with_audit(Arc::new(log)),
Err(e) => {
tracing::warn!(error = %e, "failed to open audit log; peer events will not be audited");
PeerTable::new()
}
};
let fw_config = firewall::load_firewall().unwrap_or_else(|e| {
tracing::warn!(error = %e, "failed to load firewall config, using defaults");
firewall::FirewallConfig::default()
});
let shared_firewall = SharedFirewall::new(fw_config);
shared_firewall.clone().spawn_evictor(token.clone());
let active = Arc::new(AtomicBool::new(false));
let (tun_tx, tun_rx) = mpsc::channel::<Bytes>(256);
forward::spawn_tun_writer(tun_writer, tun_rx, active.clone());
let device_user_map = peers::DeviceUserMap::new();
let hostname_table = dns::new_hostname_table();
let reverse_table = dns::new_reverse_table();
let dns_resolver = std::sync::Arc::new(crate::dns_resolver::Resolver::new(
hostname_table.clone(),
reverse_table.clone(),
));
tokio::spawn(forward::run_mesh(
tun_reader,
peers.clone(),
shared_firewall.clone(),
token.clone(),
stats.clone(),
dns_resolver.clone(),
tun_tx.clone(),
));
let mdns_enabled = app_config.mdns_enabled;
if mdns_enabled {
spawn_mdns_discovery(&ep, token.clone());
} else {
tracing::info!("mDNS discovery disabled");
}
let pairing_secret: Arc<std::sync::Mutex<Option<[u8; 32]>>> =
Arc::new(std::sync::Mutex::new(None));
let protocol_router = Arc::new(ProtocolRouter::new(
blobs_proto,
key.clone(),
pairing_secret.clone(),
));
let (promote_tx, promote_rx) = mpsc::channel::<String>(16);
let daemon = Arc::new(DaemonState {
endpoint: ep,
identity,
peers,
stats: stats.clone(),
start: Instant::now(),
tun_tx,
networks: Arc::new(DashMap::new()),
shutdown_token: token.clone(),
blob_store,
firewall: shared_firewall,
protocol_router: protocol_router.clone(),
hostname_table,
reverse_table,
mdns_enabled,
tun_name,
pairing_secret,
device_cert,
device_user_map,
contact_public,
active: active.clone(),
dns_configurator: Arc::new(std::sync::Mutex::new(None)),
resolver: dns_resolver.clone(),
dns_reassert_token: std::sync::Mutex::new(None),
promote_tx,
});
protocol_router.spawn_accept_loop(daemon.endpoint.clone(), token.clone());
if let Ok(pkarr_client) = dht::create_pkarr_client(&daemon.endpoint) {
spawn_contact_publisher(
pkarr_client,
daemon.endpoint.id(),
token.clone(),
);
}
let metrics_server =
spawn_metrics_server(stats, daemon.peers.clone(), &daemon.endpoint, token).await;
tracing::info!(ip = %my_ip, id = %daemon.endpoint.id().fmt_short(), "daemon started");
Ok((daemon, metrics_server, promote_rx))
}
fn spawn_mdns_discovery(ep: &Endpoint, token: CancellationToken) {
let mdns = match iroh_mdns_address_lookup::MdnsAddressLookup::builder()
.service_name("rayfish")
.advertise(true)
.build(ep.id())
{
Ok(mdns) => mdns,
Err(e) => {
tracing::warn!(error = %e, "failed to start mDNS discovery");
return;
}
};
let Ok(lookups) = ep.address_lookup() else {
return;
};
lookups.add(mdns.clone());
tracing::info!("mDNS discovery enabled (advertising _rayfish._udp.local)");
tokio::spawn(async move {
use futures::StreamExt;
let mut events = mdns.subscribe().await;
loop {
tokio::select! {
_ = token.cancelled() => break,
event = events.next() => match event {
Some(iroh_mdns_address_lookup::DiscoveryEvent::Discovered { endpoint_info, .. }) => {
tracing::info!(
peer = %endpoint_info.endpoint_id.fmt_short(),
"mDNS: peer discovered on LAN"
);
}
Some(iroh_mdns_address_lookup::DiscoveryEvent::Expired { endpoint_id }) => {
tracing::info!(
peer = %endpoint_id.fmt_short(),
"mDNS: peer left LAN"
);
}
None => break,
_ => {}
}
}
}
});
}
async fn spawn_metrics_server(
stats: Arc<ForwardMetrics>,
peers: PeerTable,
endpoint: &Endpoint,
token: CancellationToken,
) -> Option<iroh_metrics::service::MetricsServer> {
let mut registry = iroh_metrics::Registry::default();
registry.register(stats);
let peer_metrics = Arc::new(crate::stats::PeerMetrics::default());
registry.register(peer_metrics.clone());
peer_metrics.spawn_collector(peers, token);
registry.register_all(endpoint.metrics());
let metrics_addr: SocketAddr = ([0, 0, 0, 0], 9090).into();
match iroh_metrics::service::MetricsServer::spawn(metrics_addr, Arc::new(registry)).await {
Ok(server) => {
tracing::info!(addr = %server.local_addr(), "metrics server started");
Some(server)
}
Err(e) => {
tracing::warn!(error = %e, "failed to start metrics server (Prometheus export disabled)");
None
}
}
}
async fn serve_ipc(
daemon: &Arc<DaemonState>,
mut promote_rx: mpsc::Receiver<String>,
token: CancellationToken,
) -> Result<()> {
let socket_path = ipc::socket_path();
if let Some(parent) = socket_path.parent() {
std::fs::create_dir_all(parent)?;
}
if socket_path.exists() {
std::fs::remove_file(&socket_path)?;
}
let listener = UnixListener::bind(&socket_path).context("failed to bind IPC socket")?;
set_socket_permissions(&socket_path);
tracing::info!(path = %socket_path.display(), "IPC socket listening");
loop {
tokio::select! {
_ = token.cancelled() => {
tracing::info!("daemon shutting down");
daemon.deactivate().await;
let _ = std::fs::remove_file(&socket_path);
return Ok(());
}
Some(net) = promote_rx.recv() => {
daemon.promote_to_coordinator(&net).await;
}
result = listener.accept() => match result {
Ok((stream, _)) => {
let daemon = daemon.clone();
tokio::spawn(async move {
if let Err(e) = handle_ipc_client(stream, &daemon).await {
tracing::debug!(error = %e, "IPC client error");
}
});
}
Err(e) => tracing::warn!(error = %e, "IPC accept error"),
}
}
}
}
fn set_socket_permissions(path: &std::path::Path) {
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
if let Ok(c_path) = CString::new(path.as_os_str().as_bytes()) {
unsafe { libc::chmod(c_path.as_ptr(), 0o666) };
tracing::info!("IPC socket mode 0666 (per-request authorization via peer creds)");
}
}
async fn handle_ipc_client(stream: UnixStream, daemon: &Arc<DaemonState>) -> Result<()> {
let peer_cred = stream.peer_cred().ok().map(|c| (c.uid(), c.gid()));
let mut framed = ipc::framed(stream);
let req = ipc::recv(&mut framed).await?;
let resp = daemon.handle_request(req, peer_cred).await;
ipc::send(&mut framed, resp).await?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn spawn_network_publisher(
client: PkarrRelayClient,
net_secret_key: SecretKey,
state: Arc<std::sync::RwLock<NetworkState>>,
endpoint_id: EndpointId,
peers: PeerTable,
network_name: String,
notify: Arc<tokio::sync::Notify>,
token: CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
let hash = {
let s = state.read().unwrap();
s.snapshot
.as_ref()
.map(|snap| snap.hash)
.unwrap_or_else(|| {
group_blob_hash(
&s.members,
&s.approved,
&s.suggested_firewall,
s.network_name.as_deref(),
&s.reusable_keys,
)
})
};
let mut seed_peers: Vec<EndpointId> = peers
.peers_for_network(&network_name)
.into_iter()
.map(|(id, _)| id)
.collect();
seed_peers.push(endpoint_id);
seed_peers.sort_by_key(|id| id.to_string());
seed_peers.dedup();
match dht::publish_network(&client, &net_secret_key, &hash, &seed_peers).await {
Ok(()) => tracing::info!(peers = seed_peers.len(), "published network record"),
Err(e) => tracing::warn!(error = %e, "failed to publish network record"),
}
tokio::select! {
_ = token.cancelled() => break,
_ = notify.notified() => {},
_ = tokio::time::sleep(Duration::from_secs(300)) => {},
}
}
})
}
fn spawn_contact_publisher(
client: PkarrRelayClient,
endpoint_id: EndpointId,
token: CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
let secret = config::load().ok().and_then(|c| c.contact_secret_key);
if let Some(secret) = secret {
match dht::publish_contact(&client, &secret, endpoint_id).await {
Ok(()) => {
tracing::debug!(contact = %secret.public().fmt_short(), "published contact record")
}
Err(e) => tracing::warn!(error = %e, "failed to publish contact record"),
}
}
tokio::select! {
_ = token.cancelled() => break,
_ = tokio::time::sleep(Duration::from_secs(150)) => {},
}
}
})
}
#[allow(clippy::too_many_arguments)]
fn spawn_lazy_publisher(
client: PkarrRelayClient,
net_secret_key: SecretKey,
state: Arc<std::sync::RwLock<NetworkState>>,
endpoint_id: EndpointId,
peers: PeerTable,
network_name: String,
token: CancellationToken,
) -> JoinHandle<()> {
const LAZY_PUBLISH_INTERVAL: Duration = Duration::from_secs(10);
tokio::spawn(async move {
let mut last_hash: Option<blake3::Hash> = None;
loop {
let hash = {
let s = state.read().unwrap();
s.snapshot
.as_ref()
.map(|snap| snap.hash)
.unwrap_or_else(|| {
group_blob_hash(
&s.members,
&s.approved,
&s.suggested_firewall,
s.network_name.as_deref(),
&s.reusable_keys,
)
})
};
if last_hash != Some(hash) {
let mut seed_peers: Vec<EndpointId> = peers
.peers_for_network(&network_name)
.into_iter()
.map(|(id, _)| id)
.collect();
seed_peers.push(endpoint_id);
seed_peers.sort_by_key(|id| id.to_string());
seed_peers.dedup();
match dht::publish_network(&client, &net_secret_key, &hash, &seed_peers).await {
Ok(()) => {
tracing::info!(
network = %network_name,
"lazy publisher: published network record"
);
last_hash = Some(hash);
}
Err(e) => tracing::warn!(error = %e, "lazy publisher: publish failed"),
}
}
tokio::select! {
_ = token.cancelled() => break,
_ = tokio::time::sleep(LAZY_PUBLISH_INTERVAL) => {},
}
}
})
}
fn apply_suggested_firewall(
firewall: &SharedFirewall,
my_identity: EndpointId,
network_name: &str,
state: &std::sync::RwLock<NetworkState>,
) {
let (suggestions, members): (SuggestedFirewall, Vec<Member>) = {
let s = state.read().unwrap();
let members = s.members.all().into_iter().cloned().collect();
(s.suggested_firewall.clone(), members)
};
let my_hostname = members
.iter()
.find(|m| m.identity == my_identity)
.and_then(|m| m.hostname.clone());
let Some(my_hostname) = my_hostname else {
return;
};
let map: HashMap<&str, EndpointId> = members
.iter()
.filter_map(|m| m.hostname.as_deref().map(|h| (h, m.identity)))
.collect();
let resolve = |h: &str| map.get(h).copied();
let rules =
firewall::materialize_suggestions(network_name, &my_hostname, &suggestions, &resolve);
let auto_accept = config::load()
.ok()
.and_then(|c| {
c.networks
.into_iter()
.find(|n| n.name == network_name)
.map(|n| n.auto_accept_firewall)
})
.unwrap_or(false);
if auto_accept {
let config = firewall.replace_network_rules(network_name, rules);
if let Err(e) = firewall::save_firewall(&config) {
tracing::warn!(error = %e, network = network_name, "failed to persist firewall config");
}
state.write().unwrap().pending_suggestions.clear();
tracing::info!(
network = network_name,
"auto-accepted suggested firewall rules"
);
} else {
let count = rules.len();
state.write().unwrap().pending_suggestions = rules;
tracing::info!(
network = network_name,
count,
"queued suggested firewall rules for review"
);
}
}
async fn resolve_signed(
endpoint: &Endpoint,
net_pubkey: EndpointId,
) -> Option<(blake3::Hash, Vec<EndpointId>)> {
let client = dht::create_pkarr_client(endpoint).ok()?;
dht::resolve_network(&client, net_pubkey).await.ok()
}
async fn fetch_verified_blob(
endpoint: &Endpoint,
blob_store: &FsStore,
peers: &PeerTable,
signed: blake3::Hash,
network_name: &str,
seeds: &[EndpointId],
) -> Option<crate::membership::GroupBlob> {
let blob_hash = iroh_blobs::Hash::from_bytes(*signed.as_bytes());
let mut peer_ids: Vec<EndpointId> = peers
.peers_for_network(network_name)
.into_iter()
.map(|(id, _)| id)
.collect();
peer_ids.extend_from_slice(seeds);
peer_ids.sort_by_key(|id| id.to_string());
peer_ids.dedup();
for pid in &peer_ids {
if let Ok(conn) =
transport::connect_to_peer_with_alpn(endpoint, *pid, iroh_blobs::protocol::ALPN).await
&& blob_store
.remote()
.fetch(conn, HashAndFormat::raw(blob_hash))
.await
.is_ok()
&& let Ok(bytes) = blob_store.blobs().get_bytes(blob_hash).await
&& let Ok(data) = crate::membership::verify_group_blob(&bytes, &signed)
{
return Some(data);
}
}
None
}
#[allow(clippy::too_many_arguments)]
async fn reconverge_and_apply(
endpoint: &Endpoint,
blob_store: &FsStore,
peers: &PeerTable,
net_pubkey: EndpointId,
network_name: &str,
state: &Arc<std::sync::RwLock<NetworkState>>,
my_identity: EndpointId,
hostname_table: &dns::HostnameTable,
reverse_table: &dns::ReverseLookupTable,
firewall: &SharedFirewall,
alpn: &[u8],
my_ip: Ipv4Addr,
device_cert: &Option<control::DeviceCert>,
) {
let current = state.read().unwrap().snapshot.as_ref().map(|s| s.hash);
let Some((signed, seeds)) = resolve_signed(endpoint, net_pubkey).await else {
tracing::debug!(network = %network_name, "reconverge: signed record unavailable");
return;
};
if crate::membership::trusted_reconverge_hash(current, signed).is_none() {
let roster = state
.read()
.unwrap()
.members
.all()
.into_iter()
.cloned()
.collect::<Vec<Member>>();
drain_pending_rename(
endpoint,
&roster,
alpn,
network_name,
my_identity,
my_ip,
device_cert,
)
.await;
return;
}
let Some(data) =
fetch_verified_blob(endpoint, blob_store, peers, signed, network_name, &seeds).await
else {
tracing::warn!(network = %network_name, "reconverge: could not fetch verified blob");
return;
};
let tiebroken = crate::membership::resolve_ip_tiebreak(data.members.clone());
if let Err(e) = crate::membership::validate_no_duplicate_ips(&tiebroken) {
tracing::warn!(network = %network_name, error = %e, "roster still has duplicate IPs after tiebreak; applying tiebroken version");
}
let roster = {
let mut s = state.write().unwrap();
s.members = MemberList::from_members(tiebroken);
s.approved = ApprovedList::from_entries(data.approved.clone());
s.suggested_firewall = data.suggested_firewall.clone();
s.refresh_snapshot();
s.members
.all()
.into_iter()
.cloned()
.collect::<Vec<Member>>()
};
apply_roster_to_dns(
&roster,
network_name,
my_identity,
hostname_table,
reverse_table,
)
.await;
apply_suggested_firewall(firewall, my_identity, network_name, state);
drain_pending_rename(
endpoint,
&roster,
alpn,
network_name,
my_identity,
my_ip,
device_cert,
)
.await;
tracing::info!(network = %network_name, "reconverged from signed record");
}
fn coordinator_dial_order(
minter: EndpointId,
members: &[Member],
me: EndpointId,
) -> Vec<EndpointId> {
let mut order = Vec::new();
let is_coord = |id: EndpointId| members.iter().any(|m| m.identity == id && m.is_coordinator);
if minter != me && is_coord(minter) {
order.push(minter);
}
for m in members {
if m.is_coordinator && m.identity != me && !order.contains(&m.identity) {
order.push(m.identity);
}
}
order
}
fn gossip_targets(members: &[Member], me: EndpointId) -> Vec<EndpointId> {
members
.iter()
.filter(|m| m.is_coordinator && m.identity != me)
.map(|m| m.identity)
.collect()
}
fn sender_is_coordinator(state: &Arc<std::sync::RwLock<NetworkState>>, peer: EndpointId) -> bool {
state
.read()
.unwrap()
.members
.all()
.iter()
.any(|m| m.identity == peer && m.is_coordinator)
}
async fn gossip_to_coordinators(
peers: &PeerTable,
network: &str,
members: &[Member],
me: EndpointId,
msg: &ControlMsg,
) {
let targets = gossip_targets(members, me);
if targets.is_empty() {
return;
}
for (eid, _ip, conn) in peers.peers_for_network_with_conn(network) {
if !targets.contains(&eid) {
continue;
}
if let Ok((mut send, _)) = conn.open_bi().await {
let _ = control::send_msg(&mut send, msg).await;
}
}
}
#[derive(Clone, Copy, PartialEq, Debug)]
#[allow(dead_code)]
enum DialOutcome {
Welcomed,
Denied,
Unreachable,
}
#[allow(dead_code)]
fn pick_first_welcome(outcomes: &[DialOutcome]) -> (usize, bool) {
for (i, o) in outcomes.iter().enumerate() {
if *o == DialOutcome::Welcomed {
return (i, true);
}
}
(outcomes.len().saturating_sub(1), false)
}
fn persisted_roster(network_name: &str) -> Vec<Member> {
config::load()
.ok()
.and_then(|c| c.networks.into_iter().find(|n| n.name == network_name))
.map(|n| {
n.members
.into_iter()
.map(|m| Member {
identity: m.identity,
ip: m.ip,
is_coordinator: m.is_coordinator,
hostname: m.hostname,
user_identity: None,
device_cert: None,
collision_index: 0,
})
.collect()
})
.unwrap_or_default()
}
#[allow(clippy::too_many_arguments)]
fn spawn_group_poller(
client: PkarrRelayClient,
net_pubkey: EndpointId,
state: Arc<std::sync::RwLock<NetworkState>>,
endpoint: Endpoint,
blob_store: FsStore,
peers: PeerTable,
network_name: String,
fw: SharedFirewall,
token: CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::select! {
_ = token.cancelled() => break,
_ = tokio::time::sleep(Duration::from_secs(60)) => {},
}
let current_hash = {
let s = state.read().unwrap();
s.snapshot.as_ref().map(|snap| snap.hash)
};
let (remote_hash, _seed_peers) = match dht::resolve_network(&client, net_pubkey).await {
Ok(r) => r,
Err(e) => {
tracing::debug!(error = %e, "group poll failed");
continue;
}
};
if current_hash == Some(remote_hash) {
continue;
}
tracing::info!(old = ?current_hash, new = %remote_hash, "group blob changed");
let blob_hash = iroh_blobs::Hash::from_bytes(*remote_hash.as_bytes());
let peer_ids: Vec<EndpointId> = peers
.peers_for_network(&network_name)
.into_iter()
.map(|(id, _)| id)
.collect();
let mut new_data = None;
for peer_id in &peer_ids {
let conn = match transport::connect_to_peer_with_alpn(
&endpoint,
*peer_id,
iroh_blobs::protocol::ALPN,
)
.await
{
Ok(c) => c,
Err(_) => continue,
};
if blob_store
.remote()
.fetch(conn, HashAndFormat::raw(blob_hash))
.await
.is_err()
{
continue;
}
match blob_store.blobs().get_bytes(blob_hash).await {
Ok(bytes) => match crate::membership::decode_group_blob(&bytes) {
Ok(data) => {
new_data = Some(data);
break;
}
Err(_) => continue,
},
Err(_) => continue,
}
}
let Some(data) = new_data else {
tracing::warn!("could not fetch updated group blob from any peer");
continue;
};
let old_members: Vec<EndpointId> = {
let s = state.read().unwrap();
s.members.all().iter().map(|m| m.identity).collect()
};
let new_member_ids: std::collections::HashSet<EndpointId> =
data.members.iter().map(|m| m.identity).collect();
for old_id in &old_members {
if !new_member_ids.contains(old_id) {
let s = state.read().unwrap();
if let Some(member) = s.members.get(old_id) {
peers.remove(&member.ip, &derive_ipv6(old_id));
tracing::info!(peer = %old_id.fmt_short(), "removed kicked peer");
}
}
}
let my_id = endpoint.id();
if !new_member_ids.contains(&my_id)
&& !data.approved.iter().any(|a| a.identity == my_id)
{
tracing::warn!("we have been removed from the network");
break;
}
{
let mut s = state.write().unwrap();
s.members = MemberList::from_members(data.members.clone());
s.approved = ApprovedList::from_entries(data.approved.clone());
s.suggested_firewall = data.suggested_firewall.clone();
s.refresh_snapshot();
}
apply_suggested_firewall(&fw, endpoint.id(), &network_name, &state);
}
})
}
struct CoordinatorCleanup {
state: Arc<std::sync::RwLock<NetworkState>>,
blob_store: FsStore,
dht_notify: Option<Arc<tokio::sync::Notify>>,
hostname_table: dns::HostnameTable,
reverse_table: dns::ReverseLookupTable,
device_user_map: peers::DeviceUserMap,
network_name: String,
}
fn spawn_peer_cleanup(
mut disconnect_rx: mpsc::Receiver<forward::DisconnectEvent>,
peers: PeerTable,
token: CancellationToken,
coordinator: Option<CoordinatorCleanup>,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
tokio::select! {
_ = token.cancelled() => return,
event = disconnect_rx.recv() => {
match event {
Some(ev) => {
tracing::info!(peer = %ev.endpoint_id.fmt_short(), ip = %ev.ip, network = %ev.network, intentional = ev.intentional, "removing dead peer");
peers.remove_peer_from_network(&ev.ip, &ev.ipv6, &ev.network);
if ev.intentional && let Some(c) = &coordinator {
let member_id = c.device_user_map.resolve(&ev.endpoint_id);
c.state.write().unwrap().members.remove(&member_id);
dns::remove_hostname_by_ip(
&c.hostname_table,
&c.reverse_table,
&c.network_name,
ev.ip,
)
.await;
update_snapshot_and_publish(&c.state, &c.blob_store, &c.dht_notify).await;
broadcast_member_sync(&peers, None).await;
tracing::info!(peer = %member_id.fmt_short(), "pruned member after leave");
}
}
None => return,
}
}
}
}
})
}
#[allow(clippy::too_many_arguments)]
fn spawn_coordinator_control_reader(
conn: Connection,
remote_id: EndpointId,
peer_ip: Ipv4Addr,
network_name: String,
state: Arc<std::sync::RwLock<NetworkState>>,
hostname_table: dns::HostnameTable,
reverse_table: dns::ReverseLookupTable,
device_user_map: peers::DeviceUserMap,
peers: PeerTable,
blob_store: FsStore,
dht_notify: Option<Arc<tokio::sync::Notify>>,
token: CancellationToken,
invite_lock: Arc<tokio::sync::Mutex<()>>,
pending_pongs: Arc<DashMap<u64, tokio::sync::oneshot::Sender<()>>>,
) {
tokio::spawn(async move {
let mut gate = crate::ratelimit::ControlGate::new();
loop {
let accepted = tokio::select! {
_ = token.cancelled() => return,
r = conn.accept_bi() => r,
};
let mut recv = match accepted {
Ok((_send, recv)) => recv,
Err(_) => return, };
let msg = match control::recv_msg(&mut recv).await {
Ok(m) => m,
Err(_) => continue,
};
match gate.check() {
crate::ratelimit::Verdict::Allow => {}
crate::ratelimit::Verdict::Drop => continue,
crate::ratelimit::Verdict::Close => {
tracing::warn!(peer = %remote_id.fmt_short(), "control-plane flood; closing connection");
conn.close(VarInt::from_u32(forward::ABUSE_CODE), b"control flood");
return;
}
}
match msg {
ControlMsg::InviteShare {
id,
secret_hash,
expires,
} => {
if !sender_is_coordinator(&state, remote_id) {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteShare from non-coordinator");
continue;
}
let Ok(hash) = String::from_utf8(secret_hash) else {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteShare with non-utf8 hash");
continue;
};
let _guard = invite_lock.lock().await;
if let Ok(mut store) = crate::invite::InviteStore::load(&network_name) {
let _ = store.record_shared(id, hash, expires);
}
continue;
}
ControlMsg::InviteUsed { secret_hash } => {
if !sender_is_coordinator(&state, remote_id) {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteUsed from non-coordinator");
continue;
}
let Ok(hash) = String::from_utf8(secret_hash) else {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteUsed with non-utf8 hash");
continue;
};
let _guard = invite_lock.lock().await;
if let Ok(mut store) = crate::invite::InviteStore::load(&network_name) {
let _ = store.burn_by_hash(&hash);
}
continue;
}
ControlMsg::Ping { nonce } => {
respond_pong(&conn, nonce).await;
continue;
}
ControlMsg::Pong { nonce } => {
if let Some((_, tx)) = pending_pongs.remove(&nonce) {
let _ = tx.send(());
}
continue;
}
_ => {}
}
let ControlMsg::MeshHello {
hostname,
device_cert,
..
} = msg
else {
continue;
};
if let Some(ref cert) = device_cert
&& cert.verify()
&& cert.device_key == remote_id
{
{
let mut s = state.write().unwrap();
if let Some(m) = s.members.get_mut(&remote_id) {
m.user_identity = Some(cert.user_identity);
m.device_cert = Some(cert.clone());
}
}
device_user_map.insert(remote_id, cert.user_identity);
}
let Some(desired) = hostname else { continue };
tracing::info!(
network = %network_name,
peer = %remote_id.fmt_short(),
desired = %desired,
"coordinator received MeshHello hostname"
);
let (final_hostname, changed) = {
let s = state.read().unwrap();
let taken: Vec<String> = s
.members
.all()
.iter()
.filter(|m| m.identity != remote_id)
.filter_map(|m| m.hostname.clone())
.collect();
let taken_refs: Vec<&str> = taken.iter().map(|s| s.as_str()).collect();
let final_hostname = crate::hostname::resolve_collision(&desired, &taken_refs);
let old = s
.members
.all()
.iter()
.find(|m| m.identity == remote_id)
.and_then(|m| m.hostname.clone());
let changed = old.as_deref() != Some(final_hostname.as_str());
(final_hostname, changed)
};
if changed {
let mut s = state.write().unwrap();
if let Some(m) = s.members.get_mut(&remote_id) {
m.hostname = Some(final_hostname.clone());
}
}
dns::remove_hostname_by_ip(&hostname_table, &reverse_table, &network_name, peer_ip)
.await;
let ipv6 = derive_ipv6(&remote_id);
dns::update_hostname(
&hostname_table,
&reverse_table,
&network_name,
&final_hostname,
peer_ip,
ipv6,
)
.await;
if changed {
tracing::info!(peer = %remote_id.fmt_short(), network = %network_name, hostname = %final_hostname, "peer hostname changed; republishing blob + broadcasting MemberSync");
update_snapshot_and_publish(&state, &blob_store, &dht_notify).await;
broadcast_member_sync(&peers, None).await;
} else {
tracing::debug!(peer = %remote_id.fmt_short(), network = %network_name, hostname = %final_hostname, "peer hostname unchanged; no republish (idempotent MeshHello)");
}
}
});
}
fn choose_path_index(classes: &[(ipc::ConnType, bool)]) -> Option<usize> {
if let Some(i) = classes.iter().position(|(_, selected)| *selected) {
return Some(i);
}
for want in [
ipc::ConnType::Direct,
ipc::ConnType::Relay,
ipc::ConnType::Tor,
] {
if let Some(i) = classes.iter().position(|(ct, _)| *ct == want) {
return Some(i);
}
}
(!classes.is_empty()).then_some(0)
}
fn rename_satisfied(pending: &str, blob: Option<&str>) -> bool {
match blob {
Some(name) if name == pending => true,
Some(name) => name
.strip_prefix(pending)
.and_then(|rest| rest.strip_prefix('-'))
.is_some_and(|digits| !digits.is_empty() && digits.bytes().all(|b| b.is_ascii_digit())),
None => false,
}
}
async fn drain_pending_rename(
endpoint: &Endpoint,
roster: &[Member],
alpn: &[u8],
network_name: &str,
my_identity: EndpointId,
my_ip: Ipv4Addr,
device_cert: &Option<control::DeviceCert>,
) {
let Some(pending) = (match config::load_network(network_name) {
Ok(Some(net)) => net.pending_hostname,
_ => None,
}) else {
return;
};
let coordinators: Vec<&Member> = roster
.iter()
.filter(|m| m.is_coordinator && m.identity != my_identity)
.collect();
tracing::info!(
network = %network_name,
hostname = %pending,
coordinators = coordinators.len(),
"pending rename outstanding; delivering MeshHello to coordinator set"
);
if coordinators.is_empty() {
tracing::warn!(
network = %network_name,
hostname = %pending,
"no other coordinator in roster to deliver pending rename to; will retry on next reconverge/backstop"
);
}
for m in coordinators {
match transport::connect_to_peer_with_alpn(endpoint, m.identity, alpn).await {
Ok(conn) => {
if let Ok((mut send, _recv)) = conn.open_bi().await {
let _ = control::send_msg(
&mut send,
&ControlMsg::MeshHello {
identity: my_identity,
ip: my_ip,
hostname: Some(pending.clone()),
device_cert: device_cert.clone(),
},
)
.await;
tracing::info!(
network = %network_name,
coordinator = %m.identity.fmt_short(),
hostname = %pending,
"re-sent pending rename to coordinator"
);
}
}
Err(e) => {
tracing::debug!(
network = %network_name,
coordinator = %m.identity.fmt_short(),
error = %e,
"could not reach coordinator to deliver pending rename; will retry"
);
}
}
}
}
fn has_pending_hostname(network_name: &str) -> bool {
matches!(
config::load_network(network_name),
Ok(Some(net)) if net.pending_hostname.is_some()
)
}
fn outgoing_hostname(network_name: &str) -> Option<String> {
match config::load_network(network_name) {
Ok(Some(net)) => net.pending_hostname.or(net.my_hostname),
_ => None,
}
}
async fn apply_roster_to_dns(
members: &[Member],
network_name: &str,
my_identity: EndpointId,
hostname_table: &dns::HostnameTable,
reverse_table: &dns::ReverseLookupTable,
) {
let mut entries: Vec<(String, Ipv4Addr, std::net::Ipv6Addr)> = members
.iter()
.filter_map(|m| {
m.hostname
.as_ref()
.map(|h| (h.clone(), m.ip, derive_ipv6(&m.identity)))
})
.collect();
let blob_self = members
.iter()
.find(|m| m.identity == my_identity)
.and_then(|m| m.hostname.clone());
if let Ok(Some(mut net)) = config::load_network(network_name) {
match net.pending_hostname.clone() {
Some(pending) if !rename_satisfied(&pending, blob_self.as_deref()) => {
tracing::info!(
network = %network_name,
pending = %pending,
blob = blob_self.as_deref().unwrap_or("<none>"),
"rename still unconfirmed by signed blob; holding local name and keeping it queued for delivery"
);
if let Some(me) = members.iter().find(|m| m.identity == my_identity) {
let v6 = derive_ipv6(&my_identity);
entries.retain(|(_, v4, _)| *v4 != me.ip);
entries.push((pending.clone(), me.ip, v6));
}
if net.my_hostname.as_deref() != Some(pending.as_str()) {
net.my_hostname = Some(pending);
let _ = config::save_network(&net);
}
}
pending => {
let mut dirty = false;
if let Some(p) = &pending {
tracing::info!(
network = %network_name,
requested = %p,
confirmed = blob_self.as_deref().unwrap_or("<none>"),
"rename confirmed by signed blob; clearing pending intent"
);
net.pending_hostname = None;
dirty = true;
}
if let Some(mine) = blob_self.clone()
&& net.my_hostname.as_deref() != Some(mine.as_str())
{
net.my_hostname = Some(mine);
dirty = true;
}
if dirty {
let _ = config::save_network(&net);
}
}
}
}
dns::sync_network_hostnames(hostname_table, reverse_table, network_name, &entries).await;
}
fn now_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
async fn update_snapshot_and_publish(
state: &Arc<std::sync::RwLock<NetworkState>>,
blob_store: &FsStore,
dht_notify: &Option<Arc<tokio::sync::Notify>>,
) {
let snap_bytes = {
let mut s = state.write().unwrap();
s.refresh_snapshot();
s.snapshot.as_ref().map(|snap| snap.msgpack_bytes.clone())
};
if let Some(bytes) = snap_bytes {
let _ = blob_store.blobs().add_slice(&bytes).await;
}
if let Some(notify) = dht_notify {
notify.notify_one();
}
}
#[allow(clippy::too_many_arguments)]
enum JoinResult {
Joined(Arc<std::sync::RwLock<NetworkState>>),
Pending,
}
enum TryJoin {
Joined(IpcMessage),
Pending,
}
#[allow(clippy::too_many_arguments)]
async fn join_mesh_shared(
initial_conn: Connection,
ep: &Endpoint,
network_name: &str,
identity: &IrohIdentityProvider,
alpn: &[u8],
my_hostname: Option<String>,
peers: PeerTable,
tun_tx: mpsc::Sender<Bytes>,
disconnect_tx: mpsc::Sender<forward::DisconnectEvent>,
token: CancellationToken,
stats: Arc<ForwardMetrics>,
blob_store: FsStore,
firewall: SharedFirewall,
net_pubkey: EndpointId,
device_cert: Option<control::DeviceCert>,
device_user_map: peers::DeviceUserMap,
hostname_table: dns::HostnameTable,
reverse_table: dns::ReverseLookupTable,
invite_secret: Option<Vec<u8>>,
suggested_firewall: SuggestedFirewall,
reusable_keys: BTreeMap<String, crate::membership::ReusableKey>,
auto_accept_firewall: bool,
initial: bool,
promote_tx: mpsc::Sender<String>,
invite_lock: Arc<tokio::sync::Mutex<()>>,
pending_pongs: Arc<DashMap<u64, tokio::sync::oneshot::Sender<()>>>,
) -> Result<JoinResult> {
let my_identity = identity.local_identity();
let my_ip = identity.local_ip();
let (members, approved) = if initial {
let (mut send, mut recv) = initial_conn
.open_bi()
.await
.context("open join control stream")?;
control::send_msg(
&mut send,
&ControlMsg::JoinRequest {
invite_secret,
hostname: my_hostname.clone(),
device_cert: device_cert.clone(),
},
)
.await
.context("send join request")?;
let msg = tokio::time::timeout(Duration::from_secs(30), control::recv_msg(&mut recv))
.await
.context("timeout awaiting join response")??;
match msg {
ControlMsg::Welcome { members, approved } => {
tracing::info!(network = %network_name, "welcomed to network");
if let Some(existing) = members
.iter()
.find(|m| m.ip == my_ip && m.identity != my_identity)
{
anyhow::bail!(
"IP collision: {} is already assigned to {}",
my_ip,
existing.identity
);
}
(members, approved)
}
ControlMsg::JoinPending => {
tracing::info!(network = %network_name, "join pending operator approval");
return Ok(JoinResult::Pending);
}
ControlMsg::JoinDenied { reason } => {
anyhow::bail!("join denied: {reason}");
}
other => {
anyhow::bail!("expected Welcome or JoinPending, got {other:?}");
}
}
} else {
let (_send, mut recv) = initial_conn
.accept_bi()
.await
.context("accept control stream")?;
let msg = control::recv_msg(&mut recv).await?;
match msg {
ControlMsg::Welcome { members, approved } => {
tracing::info!(network = %network_name, "welcomed to network");
(members, approved)
}
ControlMsg::JoinApproved { your_ip, members } => {
tracing::info!(ip = %your_ip, network = %network_name, "joined network (legacy)");
(members, vec![])
}
ControlMsg::MemberSync => {
tracing::info!(network = %network_name, "reconnected via peer; reconverging from signed record");
match resolve_signed(ep, net_pubkey).await {
Some((signed, seeds)) => {
match fetch_verified_blob(
ep,
&blob_store,
&peers,
signed,
network_name,
&seeds,
)
.await
{
Some(data) => (data.members, data.approved),
None => (persisted_roster(network_name), vec![]),
}
}
None => (persisted_roster(network_name), vec![]),
}
}
ControlMsg::JoinDenied { reason } => {
anyhow::bail!("join denied: {reason}");
}
other => {
anyhow::bail!("expected Welcome or MemberSync, got {other:?}");
}
}
};
let member_entries: Vec<config::MemberEntry> = members
.iter()
.map(|m| config::MemberEntry {
identity: m.identity,
ip: m.ip,
is_coordinator: m.is_coordinator,
hostname: m.hostname.clone(),
})
.collect();
let approved_config: Vec<config::ApprovedConfigEntry> = approved
.iter()
.map(|a| config::ApprovedConfigEntry {
identity: a.identity,
ip: a.ip,
hostname: a.hostname.clone(),
})
.collect();
let persisted_hostname = members
.iter()
.find(|m| m.identity == my_identity)
.and_then(|m| m.hostname.clone())
.or(my_hostname.clone());
let (direct, pending_hostname) = config::load_network(network_name)?
.map(|n| (n.direct, n.pending_hostname))
.unwrap_or((false, None));
config::save_network(&config::NetworkConfig {
name: network_name.to_string(),
group_mode: GroupMode::Restricted,
my_ip: Some(my_ip),
my_hostname: persisted_hostname,
pending_hostname,
members: member_entries,
approved: approved_config,
network_secret_key: None,
network_public_key: Some(net_pubkey),
transport: None,
auto_accept_firewall,
admins: vec![],
direct,
})?;
if !initial {
let (mut send, _recv) = initial_conn.open_bi().await?;
control::send_msg(
&mut send,
&ControlMsg::MeshHello {
identity: my_identity,
ip: my_ip,
hostname: outgoing_hostname(network_name),
device_cert: device_cert.clone(),
},
)
.await?;
}
let remote_id = initial_conn.remote_id();
let remote_ip = identity.derive_ip(&remote_id);
crate::spawn_path_logger(initial_conn.clone(), remote_id.fmt_short().to_string());
let remote_ipv6 = derive_ipv6(&remote_id);
peers.add(
remote_ip,
remote_ipv6,
initial_conn.clone(),
remote_id,
network_name,
);
forward::spawn_peer_reader(
initial_conn.clone(),
remote_id,
remote_ip,
remote_ipv6,
network_name.to_string(),
firewall.clone(),
tun_tx.clone(),
disconnect_tx.clone(),
token.clone(),
stats.clone(),
device_user_map.clone(),
);
for member in &members {
if member.identity == my_identity || member.identity == initial_conn.remote_id() {
continue;
}
match transport::connect_to_peer_with_alpn(ep, member.identity, alpn).await {
Ok(conn) => {
let (mut send, _recv) = conn.open_bi().await?;
control::send_msg(
&mut send,
&ControlMsg::MeshHello {
identity: my_identity,
ip: my_ip,
hostname: outgoing_hostname(network_name),
device_cert: device_cert.clone(),
},
)
.await?;
let member_ipv6 = derive_ipv6(&member.identity);
peers.add(
member.ip,
member_ipv6,
conn.clone(),
member.identity,
network_name,
);
forward::spawn_peer_reader(
conn,
member.identity,
member.ip,
member_ipv6,
network_name.to_string(),
firewall.clone(),
tun_tx.clone(),
disconnect_tx.clone(),
token.clone(),
stats.clone(),
device_user_map.clone(),
);
tracing::info!(peer_ip = %member.ip, "connected to mesh peer");
}
Err(e) => {
tracing::warn!(peer_ip = %member.ip, error = %e, "mesh peer unavailable");
}
}
}
let live_state = {
let mut ns = NetworkState {
members: MemberList::from_members(members.clone()),
approved: ApprovedList::from_entries(approved),
snapshot: None,
network_secret_key: None,
network_public_key: net_pubkey,
network_name: Some(network_name.to_string()),
mode: GroupMode::Restricted,
suggested_firewall,
reusable_keys,
pending_suggestions: Vec::new(),
pending: HashMap::new(),
};
ns.refresh_snapshot();
if let Some(snap) = &ns.snapshot {
let _ = blob_store.blobs().add_slice(&snap.msgpack_bytes).await;
}
Arc::new(std::sync::RwLock::new(ns))
};
apply_suggested_firewall(&firewall, my_identity, network_name, &live_state);
let reconverge_notify = Arc::new(tokio::sync::Notify::new());
tokio::spawn({
let notify = reconverge_notify.clone();
let token = token.clone();
let live_state = live_state.clone();
let network_name = network_name.to_string();
let blob_store = blob_store.clone();
let peers_w = peers.clone();
let endpoint_w = ep.clone();
let hostname_table_w = hostname_table.clone();
let reverse_table_w = reverse_table.clone();
let firewall_w = firewall.clone();
let my_identity_w = my_identity;
let net_pubkey_w = net_pubkey;
let alpn_w = alpn.to_vec();
let my_ip_w = my_ip;
let device_cert_w = device_cert.clone();
async move {
let mut tick = tokio::time::interval(std::time::Duration::from_secs(30));
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = token.cancelled() => return,
_ = notify.notified() => {}
_ = tick.tick() => {
if !has_pending_hostname(&network_name) {
continue;
}
tracing::debug!(
network = %network_name,
"backstop tick: pending rename outstanding, reconverging to retry delivery"
);
}
}
tokio::select! {
_ = token.cancelled() => return,
_ = tokio::time::sleep(std::time::Duration::from_millis(300)) => {}
}
reconverge_and_apply(
&endpoint_w, &blob_store, &peers_w, net_pubkey_w,
&network_name, &live_state, my_identity_w,
&hostname_table_w, &reverse_table_w, &firewall_w,
&alpn_w, my_ip_w, &device_cert_w,
).await;
}
}
});
tokio::spawn({
let initial_conn = initial_conn.clone();
let token = token.clone();
let live_state = live_state.clone();
let network_name = network_name.to_string();
let peers_c = peers.clone();
let endpoint_c = ep.clone();
let my_identity_c = my_identity;
let net_pubkey_c = net_pubkey;
let promote_tx = promote_tx.clone();
let invite_lock = invite_lock.clone();
let reconverge_notify = reconverge_notify.clone();
let pending_pongs = pending_pongs.clone();
async move {
let mut gate = crate::ratelimit::ControlGate::new();
loop {
tokio::select! {
_ = token.cancelled() => return,
result = initial_conn.accept_bi() => {
match result {
Ok((_send, mut recv)) => {
let msg = match control::recv_msg(&mut recv).await {
Ok(m) => m,
Err(_) => continue,
};
match gate.check() {
crate::ratelimit::Verdict::Allow => {}
crate::ratelimit::Verdict::Drop => continue,
crate::ratelimit::Verdict::Close => {
tracing::warn!(peer = %remote_id.fmt_short(), "control-plane flood; closing connection");
initial_conn.close(VarInt::from_u32(forward::ABUSE_CODE), b"control flood");
return;
}
}
match msg {
ControlMsg::MemberApproved { identity, ip, hostname, .. } => {
let entry = ApprovedEntry { identity, ip, hostname, user_identity: None, device_cert: None, collision_index: 0 };
let mut s = live_state.write().unwrap();
let members = s.members.clone();
let _ = s.approved.approve(entry, &members);
}
ControlMsg::MemberSync => {
reconverge_notify.notify_one();
}
ControlMsg::BlobUpdated => {
reconverge_notify.notify_one();
}
ControlMsg::AdminGrant { network_pubkey, secret_key } => {
if network_pubkey != net_pubkey_c {
tracing::warn!(
peer = %remote_id.fmt_short(),
"admin grant for a different network; ignoring"
);
continue;
}
if !admin_grant_key_valid(secret_key, net_pubkey_c) {
tracing::warn!(
peer = %remote_id.fmt_short(),
"admin grant key does not match network pubkey; ignoring"
);
continue;
}
let key = SecretKey::from(secret_key);
if let Ok(Some(mut net)) = config::load_network(&network_name) {
net.network_secret_key = Some(key.clone());
let _ = config::save_network(&net);
}
let endpoint_id = endpoint_c.id();
{
let mut s = live_state.write().unwrap();
s.network_secret_key = Some(key.clone());
if let Some(m) = s.members.get_mut(&my_identity_c) {
m.is_coordinator = true;
}
s.refresh_snapshot();
}
if let Ok(client) = dht::create_pkarr_client(&endpoint_c) {
spawn_lazy_publisher(
client,
key,
live_state.clone(),
endpoint_id,
peers_c.clone(),
network_name.clone(),
token.clone(),
);
tracing::info!(
network = %network_name,
"promoted to co-coordinator; lazy publisher started"
);
}
let _ = promote_tx.send(network_name.clone()).await;
}
ControlMsg::InviteShare { id, secret_hash, expires } => {
if !sender_is_coordinator(&live_state, remote_id) {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteShare from non-coordinator");
continue;
}
let Ok(hash) = String::from_utf8(secret_hash) else {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteShare with non-utf8 hash");
continue;
};
let _guard = invite_lock.lock().await;
if let Ok(mut store) = crate::invite::InviteStore::load(&network_name) {
let _ = store.record_shared(id, hash, expires);
}
}
ControlMsg::InviteUsed { secret_hash } => {
if !sender_is_coordinator(&live_state, remote_id) {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteUsed from non-coordinator");
continue;
}
let Ok(hash) = String::from_utf8(secret_hash) else {
tracing::warn!(peer = %remote_id.fmt_short(), "ignoring InviteUsed with non-utf8 hash");
continue;
};
let _guard = invite_lock.lock().await;
if let Ok(mut store) = crate::invite::InviteStore::load(&network_name) {
let _ = store.burn_by_hash(&hash);
}
}
ControlMsg::Ping { nonce } => {
respond_pong(&initial_conn, nonce).await;
}
ControlMsg::Pong { nonce } => {
if let Some((_, tx)) = pending_pongs.remove(&nonce) {
let _ = tx.send(());
}
}
_ => {}
}
}
Err(_) => return,
}
}
}
}
}
});
Ok(JoinResult::Joined(live_state))
}
#[allow(clippy::too_many_arguments)]
fn spawn_reconnect_loop(
mut disconnect_rx: mpsc::Receiver<forward::DisconnectEvent>,
ep: Endpoint,
alpn: Vec<u8>,
network_name: String,
my_identity: EndpointId,
my_ip: Ipv4Addr,
_my_hostname: Option<String>,
peers: PeerTable,
tun_tx: mpsc::Sender<Bytes>,
disconnect_tx: mpsc::Sender<forward::DisconnectEvent>,
token: CancellationToken,
stats: Arc<ForwardMetrics>,
firewall: SharedFirewall,
device_cert: Option<control::DeviceCert>,
device_user_map: peers::DeviceUserMap,
) -> JoinHandle<()> {
use tracing::Instrument as _;
let span = tracing::info_span!("reconnect", net = %network_name);
let reconnect_loop = async move {
loop {
let event = tokio::select! {
_ = token.cancelled() => return,
event = disconnect_rx.recv() => match event {
Some(ev) => ev,
None => return,
},
};
let peer_id = event.endpoint_id;
let peer_ip = event.ip;
let peer_ipv6 = event.ipv6;
peers.remove_peer_from_network(&peer_ip, &peer_ipv6, &event.network);
if event.intentional {
tracing::info!(peer = %peer_id.fmt_short(), ip = %peer_ip, "peer left, not reconnecting");
continue;
}
tracing::info!(peer = %peer_id.fmt_short(), ip = %peer_ip, "peer disconnected, will reconnect");
let ep = ep.clone();
let alpn = alpn.clone();
let network_name = network_name.clone();
let peers = peers.clone();
let tun_tx = tun_tx.clone();
let disconnect_tx = disconnect_tx.clone();
let token = token.clone();
let stats = stats.clone();
let firewall = firewall.clone();
let device_cert = device_cert.clone();
let device_user_map = device_user_map.clone();
tokio::spawn(async move {
let mut backoff = BACKOFF_INITIAL;
loop {
if token.is_cancelled() {
return;
}
tracing::info!(peer = %peer_id.fmt_short(), secs = backoff.as_secs(), "reconnecting in");
tokio::select! {
_ = token.cancelled() => return,
_ = tokio::time::sleep(backoff) => {}
}
backoff = (backoff * 2).min(BACKOFF_MAX);
match transport::connect_to_peer_with_alpn(&ep, peer_id, &alpn).await {
Ok(conn) => {
let (mut send, _) = match conn.open_bi().await {
Ok(bi) => bi,
Err(e) => {
tracing::warn!(error = %e, "reconnect handshake failed");
continue;
}
};
if let Err(e) = control::send_msg(
&mut send,
&ControlMsg::MeshHello {
identity: my_identity,
ip: my_ip,
hostname: outgoing_hostname(&network_name),
device_cert: device_cert.clone(),
},
)
.await
{
tracing::warn!(error = %e, "reconnect MeshHello failed");
continue;
}
tracing::info!(peer = %peer_id.fmt_short(), ip = %peer_ip, "reconnected to peer");
peers.add(peer_ip, peer_ipv6, conn.clone(), peer_id, &network_name);
forward::spawn_peer_reader(
conn,
peer_id,
peer_ip,
peer_ipv6,
network_name,
firewall,
tun_tx,
disconnect_tx,
token,
stats,
device_user_map,
);
return;
}
Err(e) => {
tracing::debug!(error = %e, "reconnect attempt failed");
}
}
}
});
}
};
tokio::spawn(reconnect_loop.instrument(span))
}
async fn send_member_sync(conn: &Connection) {
if let Ok((mut send, _)) = conn.open_bi().await {
let _ = control::send_msg(&mut send, &ControlMsg::MemberSync).await;
}
}
async fn respond_pong(conn: &Connection, nonce: u64) {
if let Ok((mut send, _)) = conn.open_bi().await {
let _ = control::send_msg(&mut send, &ControlMsg::Pong { nonce }).await;
}
}
async fn broadcast_member_sync(peers: &PeerTable, exclude_ip: Option<Ipv4Addr>) {
let msg = ControlMsg::MemberSync;
for (ip, conn) in peers.all_connections() {
if Some(ip) == exclude_ip {
continue;
}
if let Ok((mut send, _)) = conn.open_bi().await
&& let Err(e) = control::send_msg(&mut send, &msg).await
{
tracing::warn!(peer_ip = %ip, error = %e, "failed to sync members");
}
}
}
async fn broadcast_control_msg(peers: &PeerTable, msg: &ControlMsg) {
for (_ip, conn) in peers.all_connections() {
if let Ok((mut send, _)) = conn.open_bi().await {
let _ = control::send_msg(&mut send, msg).await;
}
}
}
#[cfg(test)]
mod report_tests {
use super::{collect_recent_logs, write_bundle};
#[test]
fn test_write_bundle_is_valid_targz() {
let dir = std::env::temp_dir().join(format!("rayfish-test-{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("bundle.tgz");
let files = vec![
("sysinfo.txt".to_string(), b"rayfish 0.1.0\n".to_vec()),
(
"logs/rayfish.log.2026-06-23".to_string(),
b"hello log\n".to_vec(),
),
];
write_bundle(&path, &files).unwrap();
let f = std::fs::File::open(&path).unwrap();
let dec = flate2::read::GzDecoder::new(f);
let mut archive = tar::Archive::new(dec);
let mut names: Vec<String> = archive
.entries()
.unwrap()
.map(|e| e.unwrap().path().unwrap().to_string_lossy().into_owned())
.collect();
names.sort();
assert_eq!(names, vec!["logs/rayfish.log.2026-06-23", "sysinfo.txt"]);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn test_collect_recent_logs_missing_dir_is_empty() {
let _ = collect_recent_logs();
}
}
#[cfg(test)]
mod accept_handler_tests {
use super::*;
use std::collections::BTreeMap;
use std::sync::Arc;
fn make_network_state() -> Arc<std::sync::RwLock<NetworkState>> {
let net_secret = iroh::SecretKey::from_bytes(&[1u8; 32]);
let net_pub = net_secret.public();
Arc::new(std::sync::RwLock::new(NetworkState {
members: MemberList::new(),
approved: ApprovedList::new(),
snapshot: None,
network_secret_key: None,
network_public_key: net_pub,
network_name: Some("test-net".to_string()),
mode: GroupMode::Restricted,
suggested_firewall: SuggestedFirewall::default(),
reusable_keys: BTreeMap::new(),
pending_suggestions: Vec::new(),
pending: HashMap::new(),
}))
}
async fn sample_coordinator_handler() -> AcceptHandler {
let tmp = tempfile::tempdir().unwrap();
let blob_store = FsStore::load(tmp.path()).await.unwrap();
let (tun_tx, _) = tokio::sync::mpsc::channel(1);
let (disconnect_tx, _) = tokio::sync::mpsc::channel(1);
let my_key = iroh::SecretKey::from_bytes(&[2u8; 32]);
let my_id = my_key.public();
AcceptHandler::Coordinator(Arc::new(CoordinatorAcceptState {
network_name: "test-net".to_string(),
identity: IrohIdentityProvider::new(my_id, 0),
state: make_network_state(),
peers: PeerTable::new(),
tun_tx,
disconnect_tx,
token: tokio_util::sync::CancellationToken::new(),
stats: Arc::new(ForwardMetrics::default()),
dht_notify: None,
blob_store,
firewall: SharedFirewall::new(crate::firewall::FirewallConfig::default()),
hostname_table: dns::new_hostname_table(),
reverse_table: dns::new_reverse_table(),
device_user_map: peers::DeviceUserMap::new(),
invite_lock: Arc::new(tokio::sync::Mutex::new(())),
pending_pongs: Arc::new(DashMap::new()),
}))
}
async fn sample_member_handler() -> AcceptHandler {
let tmp = tempfile::tempdir().unwrap();
let blob_store = FsStore::load(tmp.path()).await.unwrap();
let (tun_tx, _) = tokio::sync::mpsc::channel(1);
let (disconnect_tx, _) = tokio::sync::mpsc::channel(1);
AcceptHandler::Member(Arc::new(MemberAcceptState {
network_name: "test-net".to_string(),
state: make_network_state(),
peers: PeerTable::new(),
tun_tx,
disconnect_tx,
token: tokio_util::sync::CancellationToken::new(),
stats: Arc::new(ForwardMetrics::default()),
blob_store,
firewall: SharedFirewall::new(crate::firewall::FirewallConfig::default()),
hostname_table: dns::new_hostname_table(),
reverse_table: dns::new_reverse_table(),
device_user_map: peers::DeviceUserMap::new(),
}))
}
#[tokio::test]
async fn register_replaces_member_handler_with_coordinator() {
assert!(!sample_member_handler().await.is_coordinator());
assert!(sample_coordinator_handler().await.is_coordinator());
}
#[test]
fn holds_key_implies_coordinator_role() {
assert_eq!(role_for_key_holder(true), NetworkRole::Coordinator);
assert_eq!(role_for_key_holder(false), NetworkRole::Member);
}
#[test]
fn choose_path_prefers_selected() {
use ipc::ConnType::*;
let classes = [(Relay, false), (Direct, true)];
assert_eq!(super::choose_path_index(&classes), Some(1));
}
#[test]
fn choose_path_falls_back_to_best_unselected() {
use ipc::ConnType::*;
let classes = [(Relay, false), (Direct, false), (Tor, false)];
assert_eq!(super::choose_path_index(&classes), Some(1));
let only_relay = [(Relay, false)];
assert_eq!(super::choose_path_index(&only_relay), Some(0));
}
#[test]
fn choose_path_empty_is_none() {
assert_eq!(super::choose_path_index(&[]), None);
}
#[test]
fn rename_satisfied_exact_and_collision_forms() {
assert!(super::rename_satisfied("scw-iroh", Some("scw-iroh")));
assert!(super::rename_satisfied("alice", Some("alice-1")));
assert!(super::rename_satisfied("alice", Some("alice-42")));
assert!(!super::rename_satisfied("scw-iroh", Some("bell")));
assert!(!super::rename_satisfied("alice", Some("alice-bob")));
assert!(!super::rename_satisfied("alice", Some("alicex")));
assert!(!super::rename_satisfied("alice", Some("alice-")));
assert!(!super::rename_satisfied("alice", None));
}
#[test]
fn promote_is_idempotent_decision() {
assert!(should_promote(NetworkRole::Member));
assert!(!should_promote(NetworkRole::Coordinator));
}
}
#[cfg(test)]
mod coordinator_dial_order_tests {
use super::*;
use crate::membership::{Member, derive_ip};
fn test_id(seed: u8) -> EndpointId {
let mut key_bytes = [0u8; 32];
key_bytes[0] = seed;
let key = iroh::SecretKey::from(key_bytes);
key.public()
}
#[test]
fn dial_order_puts_minter_first_then_other_coordinators() {
let (a, b, c, me) = (test_id(1), test_id(2), test_id(3), test_id(9));
let mk = |id, coord| Member {
identity: id,
ip: derive_ip(&id),
is_coordinator: coord,
hostname: None,
user_identity: None,
device_cert: None,
collision_index: 0,
};
let members = vec![mk(a, true), mk(b, true), mk(c, false), mk(me, true)];
assert_eq!(super::coordinator_dial_order(b, &members, me), vec![b, a]);
}
#[test]
fn admin_grant_key_accepted_only_when_public_matches_network() {
let net_secret = iroh::SecretKey::from({
let mut b = [0u8; 32];
b[0] = 42;
b
});
let net_pubkey = net_secret.public();
assert!(super::admin_grant_key_valid(
net_secret.to_bytes(),
net_pubkey
));
let forged = iroh::SecretKey::from({
let mut b = [0u8; 32];
b[0] = 7;
b
});
assert!(!super::admin_grant_key_valid(forged.to_bytes(), net_pubkey));
}
#[test]
fn gossip_targets_are_coordinator_peers_only() {
let (a, b, c) = (test_id(1), test_id(2), test_id(3));
let mk = |id, coord| Member {
identity: id,
ip: derive_ip(&id),
is_coordinator: coord,
hostname: None,
user_identity: None,
device_cert: None,
collision_index: 0,
};
let members = vec![mk(a, true), mk(b, false), mk(c, true)];
let me = a;
assert_eq!(super::gossip_targets(&members, me), vec![c]);
}
}
#[cfg(test)]
mod dial_fallback_tests {
use super::*;
#[test]
fn dial_fallback_stops_on_first_welcome() {
let outcomes = vec![
DialOutcome::Unreachable,
DialOutcome::Welcomed,
DialOutcome::Denied,
];
let (idx, welcomed) = pick_first_welcome(&outcomes);
assert_eq!((idx, welcomed), (1, true));
}
#[test]
fn dial_fallback_reports_failure_when_all_exhausted() {
let outcomes = vec![DialOutcome::Unreachable, DialOutcome::Denied];
let (_idx, welcomed) = pick_first_welcome(&outcomes);
assert!(!welcomed);
}
}