use {
super::{Catalog, Error, Event, SignedPeerEntry},
crate::{
network::{
LocalNode,
error::Success,
link::{Link, LinkError, Protocol},
},
primitives::{Short, UnboundedChannel},
},
core::fmt,
iroh::{
EndpointAddr,
endpoint::Connection,
protocol::{AcceptError, ProtocolHandler},
},
serde::{Deserialize, Serialize},
tokio::sync::{mpsc::UnboundedReceiver, watch},
};
pub(super) struct CatalogSync {
local: LocalNode,
catalog: watch::Sender<Catalog>,
events: UnboundedChannel<Event>,
}
impl Protocol for CatalogSync {
const ALPN: &'static [u8] = b"/mosaik/discovery/sync/1.0";
}
impl CatalogSync {
pub fn new(local: LocalNode, catalog: watch::Sender<Catalog>) -> Self {
Self {
local,
catalog,
events: UnboundedChannel::default(),
}
}
pub const fn events(&mut self) -> &mut UnboundedReceiver<Event> {
self.events.receiver()
}
pub const fn protocol(&self) -> &impl ProtocolHandler {
self
}
pub fn sync_with(
&self,
peer: EndpointAddr,
) -> impl Future<Output = Result<(), Error>> + Send + 'static {
let local = self.local.clone();
let catalog = self.catalog.clone();
let events_tx = self.events.sender().clone();
let peer_id = peer.id;
async move {
let cancel = local.termination().clone();
let mut link = local
.connect_with_cancel::<Self>(peer, cancel)
.await
.map_err(LinkError::from)?;
let local_snapshot = { (*catalog.borrow()).clone() };
let local_snapshot = CatalogSnapshot::from(&local_snapshot);
link
.send(&local_snapshot)
.await
.inspect_err(|e| {
if !e.is_cancelled() {
tracing::warn!(
peer = %Short(&peer_id),
error = %e,
"failed to send local catalog snapshot",
);
}
})
.map_err(LinkError::from)?;
let remote_snapshot = link
.recv::<CatalogSnapshot>()
.await
.inspect_err(|e| {
if !e.is_cancelled() {
tracing::debug!(
error = %e,
peer = %Short(peer_id),
"failed to receive remote catalog snapshot",
);
}
})
.map_err(LinkError::from)?;
catalog.send_if_modified(|catalog| {
let remote_catalog_size = remote_snapshot.0.len();
let local_catalog_size = catalog.iter_signed().count();
let events = catalog.extend_signed(remote_snapshot.0.into_iter());
let mut updates = 0;
let mut insertions = 0;
for event in events {
if matches!(event, Event::PeerDiscovered(_)) {
insertions += 1;
} else if matches!(event, Event::PeerUpdated(_)) {
updates += 1;
}
let _ = events_tx.send(event);
}
tracing::trace!(
peer = %Short(peer_id),
new = %insertions,
updated = %updates,
remote_catalog_size = %remote_catalog_size,
local_catalog_size = %local_catalog_size,
"discovery catalog synced with"
);
updates > 0 || insertions > 0
});
link.close(Success).await.map_err(LinkError::Close)?;
Ok(())
}
}
}
impl fmt::Debug for CatalogSync {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CatalogSync").finish()
}
}
impl ProtocolHandler for CatalogSync {
fn accept(
&self,
connection: Connection,
) -> impl Future<Output = Result<(), AcceptError>> + Send {
let cancel = self.local.termination().clone();
let catalog = self.catalog.clone();
async move {
let remote_id = connection.remote_id();
let mut link = Link::<Self>::accept_with_cancel(connection, cancel)
.await
.inspect_err(|e| {
if !e.is_cancelled() {
tracing::debug!(
error = %e,
peer = %Short(&remote_id),
"failed to accept incoming catalog sync link",
);
}
})?;
let remote_snapshot = link
.recv::<CatalogSnapshot>()
.await
.inspect_err(|e| {
if !e.is_cancelled() {
tracing::debug!(
error = %e,
peer = %Short(&remote_id),
"failed to receive remote catalog snapshot",
);
}
})
.map_err(AcceptError::from_err)?;
let local_snapshot = catalog.borrow().clone();
let local_snapshot = CatalogSnapshot::from(&local_snapshot);
link
.send(&local_snapshot)
.await
.inspect_err(|e| {
if !e.is_cancelled() {
tracing::warn!(
error = %e,
peer = %Short(&link.remote_id()),
"failed to send local catalog snapshot",
);
}
})
.map_err(AcceptError::from_err)?;
drop(local_snapshot);
let catalog = core::pin::pin!(catalog);
catalog.send_if_modified(|catalog| {
let remote_catalog_size = remote_snapshot.0.len();
let local_catalog_size = catalog.iter_signed().count();
let events = catalog.extend_signed(remote_snapshot.0.into_iter());
let mut updates = 0;
let mut insertions = 0;
for event in events {
if matches!(event, Event::PeerDiscovered(_)) {
insertions += 1;
} else if matches!(event, Event::PeerUpdated(_)) {
updates += 1;
}
self.events.send(event);
}
tracing::trace!(
peer = %Short(link.remote_id()),
new = %insertions,
updated = %updates,
remote_catalog_size = %remote_catalog_size,
local_catalog_size = %local_catalog_size,
"discovery catalog synced with"
);
updates > 0 || insertions > 0
});
link.closed().await?;
Ok(())
}
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct CatalogSnapshot(Vec<SignedPeerEntry>);
impl From<&Catalog> for CatalogSnapshot {
fn from(catalog: &Catalog) -> Self {
Self(catalog.iter_signed().cloned().collect())
}
}
impl core::fmt::Debug for CatalogSnapshot {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "CatalogSnapshot[{}]( ", self.0.len())?;
for entry in &self.0 {
write!(
f,
"(peer {}, version {:?}) ",
entry.id(),
entry.update_version()
)?;
}
write!(f, ")")
}
}