use {
super::{Catalog, Config, Error, PeerEntryVersion, SignedPeerEntry},
crate::{
NetworkId,
PeerId,
Signature,
discovery::PeerEntry,
network::{LocalNode, link::Protocol},
primitives::{
IntoIterOrSingle,
Pretty,
Short,
UnboundedChannel,
deserialize,
serialize,
},
},
chrono::Utc,
core::{
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
},
futures::StreamExt,
iroh::{
EndpointAddr,
address_lookup::AddressLookup,
protocol::ProtocolHandler,
},
iroh_gossip::{
Gossip,
api::{
ApiError as GossipError,
Event as GossipEvent,
GossipReceiver,
GossipSender,
GossipTopic,
},
},
serde::{Deserialize, Serialize},
std::sync::Arc,
tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
oneshot,
watch,
},
tokio_util::sync::CancellationToken,
tracing::error,
};
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum Event {
PeerEntryReceived(SignedPeerEntry),
PeerDeparted(PeerId, PeerEntryVersion),
}
pub(super) struct Announce {
gossip: Gossip,
local: LocalNode,
network_id: NetworkId,
events: UnboundedReceiver<Event>,
dials: UnboundedSender<(Vec<EndpointAddr>, oneshot::Sender<()>)>,
neighbors_count: Arc<AtomicUsize>,
}
impl Protocol for Announce {
const ALPN: &'static [u8] = b"/mosaik/discovery/announce/1.0";
}
impl Announce {
pub(super) fn new(
local: LocalNode,
config: &Config,
catalog: watch::Receiver<Catalog>,
) -> Self {
let network_id = *local.network_id();
let gossip = Gossip::builder()
.alpn(Self::ALPN)
.spawn(local.endpoint().clone());
let events = unbounded_channel();
let dials = unbounded_channel();
let cancel = local.termination().clone();
let last_own_version = catalog.borrow().local().update_version();
let neighbors_count = Arc::new(AtomicUsize::new(0));
let driver = WorkerLoop {
config: config.clone(),
gossip: gossip.clone(),
local: local.clone(),
cancel: cancel.clone(),
catalog,
events: events.0,
dials: dials.1,
last_own_version,
neighbors_count: Arc::clone(&neighbors_count),
messages_in: UnboundedChannel::default(),
messages_out: UnboundedChannel::default(),
};
tokio::spawn(async move {
if let Err(e) = driver.spawn().await {
error!(
error = %e,
network_id = %network_id,
"Unrecoverable error in discovery protocol, terminating network"
);
cancel.cancel();
}
});
Self {
gossip,
local,
network_id,
events: events.1,
dials: dials.0,
neighbors_count,
}
}
pub const fn events(&mut self) -> &mut UnboundedReceiver<Event> {
&mut self.events
}
pub const fn neighbors_count(&self) -> &Arc<AtomicUsize> {
&self.neighbors_count
}
pub async fn dial<V>(&self, peers: impl IntoIterOrSingle<EndpointAddr, V>) {
let (tx, rx) = oneshot::channel::<()>();
let Ok(addr_lookup) = self.local.endpoint().address_lookup() else {
return; };
let peers = peers.iterator().into_iter().collect::<Vec<_>>();
for peer in &peers {
addr_lookup.publish(&peer.clone().into());
}
self.dials.send((peers, tx)).ok();
let _ = rx.await;
}
pub fn observe(&self, peer: &PeerEntry) {
if peer.network_id() != self.network_id {
return;
}
self.local.observe(peer.address());
if self.neighbors_count.load(Ordering::SeqCst) == 0 {
let (tx, _) = oneshot::channel::<()>();
self.dials.send((vec![peer.address().clone()], tx)).ok();
}
}
pub const fn protocol(&self) -> &impl ProtocolHandler {
&self.gossip
}
}
struct WorkerLoop {
config: Config,
gossip: Gossip,
local: LocalNode,
cancel: CancellationToken,
events: UnboundedSender<Event>,
catalog: watch::Receiver<Catalog>,
last_own_version: PeerEntryVersion,
messages_in: UnboundedChannel<AnnouncementMessage>,
messages_out: UnboundedChannel<AnnouncementMessage>,
neighbors_count: Arc<AtomicUsize>,
dials: UnboundedReceiver<(Vec<EndpointAddr>, oneshot::Sender<()>)>,
}
impl WorkerLoop {
async fn spawn(mut self) -> Result<(), Error> {
self.local.endpoint().online().await;
self.local.observe(self.config.bootstrap_peers.iter());
let peer_ids = self.config.bootstrap_peers_ids();
let topic_id = self.local.network_id().into();
let (mut topic_tx, mut topic_rx) =
self.gossip.subscribe(topic_id, peer_ids).await?.split();
loop {
tokio::select! {
() = self.cancel.cancelled() => {
self.shutdown(&topic_tx, &topic_rx).await;
return Ok(());
}
Some(outbound) = self.messages_out.recv(), if topic_rx.is_joined() => {
self.broadcast_message(
&mut topic_tx,
&mut topic_rx,
outbound
).await?;
}
Some(inbound) = self.messages_in.recv() => {
self.on_message_received(inbound);
}
gossip_event = topic_rx.next() => {
self.on_topic_rx(gossip_event, &mut topic_rx, &mut topic_tx).await?;
}
Ok(()) = self.catalog.changed() => {
self.on_catalog_update();
}
Some((peers, tx)) = self.dials.recv() => {
self.dial_peers(peers, &topic_tx, tx).await;
}
}
}
}
async fn join_gossip_topic(&self) -> Result<GossipTopic, Error> {
let topic_id = self.local.network_id().into();
let mut peers = self.config.bootstrap_peers_ids();
for peer in self.catalog.borrow().peers() {
if *peer.id() != self.local.id() {
peers.push(*peer.id());
}
}
let topic = self.gossip.subscribe(topic_id, peers).await?;
Ok(topic)
}
async fn on_topic_rx(
&self,
gossip_event: Option<Result<GossipEvent, GossipError>>,
topic_rx: &mut GossipReceiver,
topic_tx: &mut GossipSender,
) -> Result<(), Error> {
match gossip_event {
None | Some(Err(GossipError::Closed { .. })) => {
tracing::warn!(
network = %self.local.network_id(),
"announcement gossip network connection lost, attempting to re-join"
);
self.rejoin_topic(topic_tx, topic_rx).await?;
}
Some(Err(e)) => {
tracing::warn!(
error = %e,
network = %self.local.network_id(),
"announcement gossip network down"
);
self.rejoin_topic(topic_tx, topic_rx).await?;
}
Some(Ok(event)) => {
self.on_gossip_event(event, topic_tx).await;
}
}
Ok(())
}
async fn on_gossip_event(&self, event: GossipEvent, topic_tx: &GossipSender) {
match event {
GossipEvent::NeighborUp(id) => {
self.increment_neighbor_count();
tracing::trace!(
network = %self.local.network_id(),
peer_id = %Short(&id),
neighbors = self.neighbors_count.load(Ordering::SeqCst),
"New gossip neighbor connected"
);
self.broadcast_self_info();
}
GossipEvent::NeighborDown(id) => {
self.decrement_neighbor_count(topic_tx).await;
tracing::trace!(
network = %self.local.network_id(),
peer_id = %Short(&id),
neighbors = self.neighbors_count.load(Ordering::SeqCst),
"Gossip neighbor disconnected"
);
}
GossipEvent::Received(message) => {
let Ok(decoded) = deserialize(&message.content) else {
tracing::warn!(
network = %self.local.network_id(),
"failed to decode announcement message"
);
return;
};
self.on_message_received(decoded);
}
GossipEvent::Lagged => {
self.set_neighbor_count(0, topic_tx).await;
}
}
}
fn on_message_received(&self, message: AnnouncementMessage) {
match message {
AnnouncementMessage::OwnEntryUpdate(entry) => {
if entry.network_id() != self.local.network_id() {
tracing::trace!(
peer_network = %Short(entry.network_id()),
this_network = %Short(self.local.network_id()),
"received peer entry from different network, ignoring"
);
return;
}
let Ok(time_diff) = (Utc::now() - entry.updated_at()).abs().to_std()
else {
tracing::trace!(
peer_id = %Short(&entry.id()),
network = %Short(entry.network_id()),
"ignoring discovery entry with invalid timestamp"
);
return;
};
if time_diff > self.config.max_time_drift {
tracing::trace!(
peer_id = %Short(&entry.id()),
network = %Short(entry.network_id()),
time_diff = ?time_diff,
max_drift = ?self.config.max_time_drift,
"ignoring discovery entry with stale timestamp"
);
return;
}
let _ = self.events.send(Event::PeerEntryReceived(entry));
}
AnnouncementMessage::GracefulDeparture(departure) => {
if !departure.has_valid_signature() {
tracing::trace!(
peer_id = %Short(&departure.peer_id),
"received graceful departure with invalid signature, ignoring"
);
return;
}
let time_diff = (Utc::now() - departure.timestamp)
.abs()
.to_std()
.unwrap_or(Duration::MAX);
if time_diff > self.config.max_time_drift {
tracing::trace!(
peer_id = %Short(&departure.peer_id),
time_diff = ?time_diff,
max_drift = ?self.config.max_time_drift,
"received graceful departure with invalid timestamp, ignoring"
);
return;
}
let _ = self.events.send(Event::PeerDeparted(
departure.peer_id,
departure.last_version,
));
}
}
}
fn on_catalog_update(&mut self) {
let current_local_version = self.catalog.borrow().local().update_version();
if current_local_version > self.last_own_version {
self.broadcast_self_info();
self.last_own_version = current_local_version;
}
}
fn broadcast_self_info(&self) {
let entry = self.catalog.borrow().local().clone();
tracing::trace!(
peer_info = ?Pretty(&entry),
network = %self.local.network_id(),
"broadcasting local"
);
self
.messages_out
.send(AnnouncementMessage::OwnEntryUpdate(entry));
}
async fn broadcast_message(
&self,
topic_tx: &mut GossipSender,
topic_rx: &mut GossipReceiver,
message: AnnouncementMessage,
) -> Result<(), Error> {
if !topic_rx.is_joined() {
tracing::debug!(
network = %self.local.network_id(),
"not connected to any gossip neighbors, \
deferring announcement broadcast"
);
self.messages_out.send(message);
return Ok(());
}
if let Err(e) = topic_tx.broadcast(serialize(&message)).await {
tracing::warn!(
error = %e,
network = %self.local.network_id(),
message = ?message,
"failed to broadcast announcement message"
);
if matches!(e, GossipError::Closed { .. }) {
tracing::warn!(
network = %self.local.network_id(),
"announcement gossip network connection lost, attempting to re-join"
);
self.rejoin_topic(topic_tx, topic_rx).await?;
}
} else {
self
.set_neighbor_count(topic_rx.neighbors().count(), topic_tx)
.await;
}
Ok(())
}
async fn dial_peers(
&self,
peers: Vec<EndpointAddr>,
topic_tx: &GossipSender,
tx: oneshot::Sender<()>,
) {
self.local.observe(peers.iter());
let peer_ids = peers.into_iter().map(|p| p.id).collect::<Vec<_>>();
tracing::trace!(
network = %self.local.network_id(),
peers = %Short::iter(&peer_ids),
"Dialing peers"
);
if let Err(e) = topic_tx.join_peers(peer_ids).await {
tracing::warn!(
error = %e,
network = %self.local.network_id(),
"failed to dial peers via announcement gossip network"
);
}
let _ = tx.send(());
}
async fn rejoin_topic(
&self,
topic_tx: &mut GossipSender,
topic_rx: &mut GossipReceiver,
) -> Result<(), Error> {
self.neighbors_count.store(0, Ordering::SeqCst);
let (new_topic_tx, new_topic_rx) = self.join_gossip_topic().await?.split();
*topic_tx = new_topic_tx;
*topic_rx = new_topic_rx;
Ok(())
}
async fn shutdown(self, topic_tx: &GossipSender, topic_rx: &GossipReceiver) {
tracing::trace!(
network = %self.local.network_id(),
peer_id = %Short(&self.local.id()),
"Discovery announcement protocol shutting down"
);
if topic_rx.is_joined() {
let goodbye = AnnouncementMessage::GracefulDeparture(
GracefulDeparture::new(&self.local, self.last_own_version),
);
if let Err(e) = topic_tx.broadcast(serialize(&goodbye)).await {
tracing::warn!(
error = %e,
network = %self.local.network_id(),
"failed to broadcast graceful departure message"
);
} else {
tracing::trace!(
network = %self.local.network_id(),
peer_id = %Short(&self.local.id()),
"broadcasted graceful departure message"
);
tokio::time::sleep(self.config.graceful_departure_window).await;
}
}
}
fn increment_neighbor_count(&self) {
self.neighbors_count.fetch_add(1, Ordering::SeqCst);
}
async fn decrement_neighbor_count(&self, topic_tx: &GossipSender) {
if self.neighbors_count.fetch_sub(1, Ordering::SeqCst) == 1 {
let mut peers = Vec::with_capacity(self.catalog.borrow().peers_count());
for peer in self.catalog.borrow().peers() {
if *peer.id() != self.local.id() {
self.local.observe(peer.address());
peers.push(*peer.id());
}
}
topic_tx.join_peers(peers).await.ok();
}
}
async fn set_neighbor_count(&self, count: usize, topic_tx: &GossipSender) {
self.neighbors_count.store(count, Ordering::SeqCst);
if count == 0 {
let mut peers = Vec::with_capacity(self.catalog.borrow().peers_count());
for peer in self.catalog.borrow().peers() {
if *peer.id() != self.local.id() {
self.local.observe(peer.address());
peers.push(*peer.id());
}
}
topic_tx.join_peers(peers).await.ok();
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum AnnouncementMessage {
OwnEntryUpdate(SignedPeerEntry),
GracefulDeparture(GracefulDeparture),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct GracefulDeparture {
peer_id: PeerId,
last_version: PeerEntryVersion,
timestamp: chrono::DateTime<Utc>,
signature: Signature,
}
impl GracefulDeparture {
pub fn new(local: &LocalNode, last_version: PeerEntryVersion) -> Self {
let timestamp = Utc::now();
let mut hasher = blake3::Hasher::default();
hasher.update(local.id().as_bytes());
hasher.update(&last_version.0.to_be_bytes());
hasher.update(&last_version.1.to_be_bytes());
hasher.update(×tamp.timestamp_millis().to_be_bytes());
let hash = hasher.finalize();
let signature = local.secret_key().sign(hash.as_bytes());
Self {
peer_id: local.id(),
last_version,
timestamp,
signature,
}
}
pub fn has_valid_signature(&self) -> bool {
let mut hasher = blake3::Hasher::default();
hasher.update(self.peer_id.as_bytes());
hasher.update(&self.last_version.0.to_be_bytes());
hasher.update(&self.last_version.1.to_be_bytes());
hasher.update(&self.timestamp.timestamp_millis().to_be_bytes());
let hash = hasher.finalize();
self
.peer_id
.verify(hash.as_bytes(), &self.signature)
.is_ok()
}
}